This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dd15d59  [BEAM-621] Add MapKeys, MapValues transforms (#14273)
dd15d59 is described below

commit dd15d59fc82fb50d3cd511370bd78034c6a99074
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Sat May 29 01:45:09 2021 +0300

    [BEAM-621] Add MapKeys, MapValues transforms (#14273)
    
    * [BEAM-621] Add MapKeys and MapValues PTransforms
    
    * [BEAM-621] Add MapToKeys and MapToValues PTransforms
    
    * [BEAM-621] Fix code style
    
    * [BEAM-621] Set Coder in tests
    
    * [BEAM-621] Fix assertion in testMapValues
    
    * [BEAM-621] Fix type variable names according to the CheckStyle
    
    * [BEAM-621] Add Apache License header in tests
    
    * [BEAM-621] Fix violations according to Spotless
    
    * [BEAM-621] Fix violations according to Spotless
    
    * [BEAM-621] Fix violations according to Spotless
    
    * [BEAM-621] Remove SuppressWarnings in tests
    
    * [BEAM-621] Use ImmutableList.of() instead of double-brace initialization
    
    * [BEAM-621] Remove redundant classes, fix javadoc
    
    * [BEAM-621] Handle exceptions in MapKeys and MapValues
    
    * [BEAM-621] Fix CheckStyle
    
    * [BEAM-621] Fix Spotless
    
    * [BEAM-621] Fix nullness issues
---
 .../org/apache/beam/sdk/transforms/MapKeys.java    | 163 ++++++++++++++++++++
 .../org/apache/beam/sdk/transforms/MapValues.java  | 164 +++++++++++++++++++++
 .../beam/sdk/transforms/SimpleMapWithFailures.java |  75 ++++++++++
 .../apache/beam/sdk/transforms/MapKeysTest.java    | 119 +++++++++++++++
 .../apache/beam/sdk/transforms/MapValuesTest.java  | 119 +++++++++++++++
 5 files changed, 640 insertions(+)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapKeys.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapKeys.java
new file mode 100644
index 0000000..579bbc6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapKeys.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.Contextful.Fn;
+import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+
+/**
+ * {@code MapKeys} maps a {@code SerializableFunction<K1,K2>} over keys of a 
{@code
+ * PCollection<KV<K1,V>>} and returns a {@code PCollection<KV<K2, V>>}.
+ *
+ * <p>Example of use:
+ *
+ * <pre>{@code
+ * PCollection<KV<Integer, String>> input = ...;
+ * PCollection<KV<Double, String> output =
+ *      
input.apply(MapKeys.into(TypeDescriptors.doubles()).via(Integer::doubleValue));
+ * }</pre>
+ *
+ * <p>See also {@link MapValues}.
+ *
+ * @param <K1> the type of the keys in the input {@code PCollection}
+ * @param <K2> the type of the keys in the output {@code PCollection}
+ */
+public class MapKeys<K1, K2, V> extends PTransform<PCollection<KV<K1, V>>, 
PCollection<KV<K2, V>>> {
+
+  private final transient TypeDescriptor<K2> outputType;
+  private final @Nullable Contextful<Fn<KV<K1, V>, KV<K2, V>>> fn;
+
+  /**
+   * Returns a {@code MapKeys<K1, K2, V>} {@code PTransform} for a {@code 
ProcessFunction<NewK1,
+   * K2>} with predefined {@link #outputType}.
+   *
+   * @param <NewKeyT> the type of the keys in the input {@code PCollection}
+   * @param <NewValueT> the type of the values in the input and output {@code 
PCollection}s
+   */
+  public <NewValueT, NewKeyT> MapKeys<NewKeyT, K2, NewValueT> via(
+      SerializableFunction<NewKeyT, K2> fn) {
+    return new MapKeys<>(
+        Contextful.fn(
+            ((element, c) -> KV.of(fn.apply(element.getKey()), 
element.getValue())),
+            Requirements.empty()),
+        outputType);
+  }
+
+  /**
+   * Returns a new {@link MapKeys} transform with the given type descriptor 
for the output type, but
+   * the mapping function yet to be specified using {@link 
#via(SerializableFunction)}.
+   */
+  public static <K2> MapKeys<?, K2, ?> into(final TypeDescriptor<K2> 
outputType) {
+    return new MapKeys<>(null, outputType);
+  }
+
+  private MapKeys(
+      @Nullable Contextful<Fn<KV<K1, V>, KV<K2, V>>> fn, TypeDescriptor<K2> 
outputType) {
+    this.fn = fn;
+    this.outputType = outputType;
+  }
+
+  /**
+   * Returns a new {@link SimpleMapWithFailures} transform that catches 
exceptions raised while
+   * mapping elements, with the given type descriptor used for the failure 
collection but the
+   * exception handler yet to be specified using {@link
+   * SimpleMapWithFailures#exceptionsVia(ProcessFunction)}.
+   *
+   * <p>See {@link WithFailures} documentation for usage patterns of the 
returned {@link
+   * WithFailures.Result}.
+   *
+   * <p>Example usage:
+   *
+   * <pre>{@code
+   * Result<PCollection<KV<Integer, String>>, String> result =
+   *         input.apply(
+   *             MapKeys.into(TypeDescriptors.integers())
+   *                 .<String, String>via(word -> 1 / word.length)  // Could 
throw ArithmeticException
+   *                 .exceptionsInto(TypeDescriptors.strings())
+   *                 .exceptionsVia(ee -> ee.exception().getMessage()));
+   * PCollection<KV<Integer, String>> output = result.output();
+   * PCollection<String> failures = result.failures();
+   * }</pre>
+   */
+  @RequiresNonNull("fn")
+  public <FailureT> SimpleMapWithFailures<KV<K1, V>, KV<K2, V>, FailureT> 
exceptionsInto(
+      TypeDescriptor<FailureT> failureTypeDescriptor) {
+    return new SimpleMapWithFailures<>(
+        "MapKeysWithFailures", fn, getKvTypeDescriptor(), null, 
failureTypeDescriptor);
+  }
+
+  /**
+   * Returns a new {@link SimpleMapWithFailures} transform that catches 
exceptions raised while
+   * mapping elements, passing the raised exception instance and the input 
element being processed
+   * through the given {@code exceptionHandler} and emitting the result to a 
failure collection.
+   *
+   * <p>This method takes advantage of the type information provided by {@link 
InferableFunction},
+   * meaning that a call to {@link #exceptionsInto(TypeDescriptor)} may not be 
necessary.
+   *
+   * <p>See {@link WithFailures} documentation for usage patterns of the 
returned {@link
+   * WithFailures.Result}.
+   *
+   * <p>Example usage:
+   *
+   * <pre>{@code
+   * Result<PCollection<KV<Integer, String>>, String> result =
+   *         input.apply(
+   *             MapKeys.into(TypeDescriptors.integers())
+   *                 .<String, String>via(word -> 1 / word.length)  // Could 
throw ArithmeticException
+   *                 .exceptionsVia(
+   *                     new InferableFunction<ExceptionElement<KV<String, 
String>>, String>() {
+   *                       @Override
+   *                       public String apply(ExceptionElement<KV<String, 
String>> input) {
+   *                         return input.exception().getMessage();
+   *                       }
+   *                     }));
+   * PCollection<KV<Integer, String>> output = result.output();
+   * PCollection<String> failures = result.failures();
+   * }</pre>
+   */
+  @RequiresNonNull("fn")
+  public <FailureT> SimpleMapWithFailures<KV<K1, V>, KV<K2, V>, FailureT> 
exceptionsVia(
+      InferableFunction<ExceptionElement<KV<K1, V>>, FailureT> 
exceptionHandler) {
+    return new SimpleMapWithFailures<>(
+        "MapKeysWithFailures",
+        fn,
+        getKvTypeDescriptor(),
+        exceptionHandler,
+        exceptionHandler.getOutputTypeDescriptor());
+  }
+
+  @Override
+  public PCollection<KV<K2, V>> expand(PCollection<KV<K1, V>> input) {
+    return input.apply(
+        "MapKeys",
+        MapElements.into(getKvTypeDescriptor())
+            .via(checkNotNull(fn, "Must specify a function on MapKeys using 
.via()")));
+  }
+
+  private TypeDescriptor<KV<K2, V>> getKvTypeDescriptor() {
+    return new TypeDescriptor<KV<K2, V>>() {}.where(new TypeParameter<K2>() 
{}, outputType);
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapValues.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapValues.java
new file mode 100644
index 0000000..ae1fe8e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapValues.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.Contextful.Fn;
+import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+
+/**
+ * {@code MapValues} maps a {@code SerializableFunction<V1,V2>} over values of 
a {@code
+ * PCollection<KV<K,V1>>} and returns a {@code PCollection<KV<K, V2>>}.
+ *
+ * <p>Example of use:
+ *
+ * <pre>{@code
+ * PCollection<KV<String, Integer>> input = ...;
+ * PCollection<KV<String, Double> output =
+ *      
input.apply(MapValues.into(TypeDescriptors.doubles()).via(Integer::doubleValue));
+ * }</pre>
+ *
+ * <p>See also {@link MapKeys}.
+ *
+ * @param <V1> the type of the values in the input {@code PCollection}
+ * @param <V2> the type of the elements in the output {@code PCollection}
+ */
+public class MapValues<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, V2>>> {
+
+  private final transient TypeDescriptor<V2> outputType;
+  private final @Nullable Contextful<Fn<KV<K, V1>, KV<K, V2>>> fn;
+
+  /**
+   * Returns a {@link MapValues} transform for a {@code ProcessFunction<NewV1, 
V2>} with predefined
+   * {@link #outputType}.
+   *
+   * @param <NewKeyT> the type of the keys in the input and output {@code 
PCollection}s
+   * @param <NewValueT> the type of the values in the input {@code PCollection}
+   */
+  public <NewKeyT, NewValueT> MapValues<NewKeyT, NewValueT, V2> via(
+      SerializableFunction<NewValueT, V2> fn) {
+    return new MapValues<>(
+        Contextful.fn(
+            ((element, c) -> KV.of(element.getKey(), 
fn.apply(element.getValue()))),
+            Requirements.empty()),
+        outputType);
+  }
+
+  /**
+   * Returns a new {@link MapValues} transform with the given type descriptor 
for the output type,
+   * but the mapping function yet to be specified using {@link 
#via(SerializableFunction)}.
+   */
+  public static <V2> MapValues<?, ?, V2> into(final TypeDescriptor<V2> 
outputType) {
+    return new MapValues<>(null, outputType);
+  }
+
+  private MapValues(
+      @Nullable Contextful<Fn<KV<K, V1>, KV<K, V2>>> fn, TypeDescriptor<V2> 
outputType) {
+    this.fn = fn;
+    this.outputType = outputType;
+  }
+
+  /**
+   * Returns a new {@link SimpleMapWithFailures} transform that catches 
exceptions raised while
+   * mapping elements, with the given type descriptor used for the failure 
collection but the
+   * exception handler yet to be specified using {@link
+   * SimpleMapWithFailures#exceptionsVia(ProcessFunction)}.
+   *
+   * <p>See {@link WithFailures} documentation for usage patterns of the 
returned {@link
+   * WithFailures.Result}.
+   *
+   * <p>Example usage:
+   *
+   * <pre>{@code
+   * Result<PCollection<KV<String, Integer>>, String> result =
+   *         input.apply(
+   *             MapValues.into(TypeDescriptors.integers())
+   *                 .<String, String>via(word -> 1 / word.length)  // Could 
throw ArithmeticException
+   *                 .exceptionsInto(TypeDescriptors.strings())
+   *                 .exceptionsVia(ee -> ee.exception().getMessage()));
+   * PCollection<KV<String, Integer>> output = result.output();
+   * PCollection<String> failures = result.failures();
+   * }</pre>
+   */
+  @RequiresNonNull("fn")
+  public <FailureT> SimpleMapWithFailures<KV<K, V1>, KV<K, V2>, FailureT> 
exceptionsInto(
+      TypeDescriptor<FailureT> failureTypeDescriptor) {
+    return new SimpleMapWithFailures<>(
+        "MapValuesWithFailures", fn, getKvTypeDescriptor(), null, 
failureTypeDescriptor);
+  }
+
+  /**
+   * Returns a new {@link SimpleMapWithFailures} transform that catches 
exceptions raised while
+   * mapping elements, passing the raised exception instance and the input 
element being processed
+   * through the given {@code exceptionHandler} and emitting the result to a 
failure collection.
+   *
+   * <p>This method takes advantage of the type information provided by {@link 
InferableFunction},
+   * meaning that a call to {@link #exceptionsInto(TypeDescriptor)} may not be 
necessary.
+   *
+   * <p>See {@link WithFailures} documentation for usage patterns of the 
returned {@link
+   * WithFailures.Result}.
+   *
+   * <p>Example usage:
+   *
+   * <pre>{@code
+   * Result<PCollection<KV<String, Integer>>, String> result =
+   *         input.apply(
+   *             MapValues.into(TypeDescriptors.integers())
+   *                 .<String, String>via(word -> 1 / word.length)  // Could 
throw ArithmeticException
+   *                 .exceptionsVia(
+   *                     new InferableFunction<ExceptionElement<KV<String, 
String>>, String>() {
+   *                       @Override
+   *                       public String apply(ExceptionElement<KV<String, 
String>> input) {
+   *                         return input.exception().getMessage();
+   *                       }
+   *                     }));
+   * PCollection<KV<String, Integer>> output = result.output();
+   * PCollection<String> failures = result.failures();
+   * }</pre>
+   */
+  @RequiresNonNull("fn")
+  public <FailureT> SimpleMapWithFailures<KV<K, V1>, KV<K, V2>, FailureT> 
exceptionsVia(
+      InferableFunction<ExceptionElement<KV<K, V1>>, FailureT> 
exceptionHandler) {
+    return new SimpleMapWithFailures<>(
+        "MapValuesWithFailures",
+        fn,
+        getKvTypeDescriptor(),
+        exceptionHandler,
+        exceptionHandler.getOutputTypeDescriptor());
+  }
+
+  @Override
+  public PCollection<KV<K, V2>> expand(PCollection<KV<K, V1>> input) {
+    return input.apply(
+        "MapValues",
+        MapElements.into(getKvTypeDescriptor())
+            .via(checkNotNull(fn, "Must specify a function on MapValues using 
.via()")));
+  }
+
+  private TypeDescriptor<KV<K, V2>> getKvTypeDescriptor() {
+    return new TypeDescriptor<KV<K, V2>>() {}.where(new TypeParameter<V2>() 
{}, outputType);
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleMapWithFailures.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleMapWithFailures.java
new file mode 100644
index 0000000..8d58f36
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleMapWithFailures.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.transforms.Contextful.Fn;
+import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A {@code PTransform} that adds exception handling to {@link MapKeys} and 
{@link MapValues} using
+ * {@link MapElements.MapWithFailures}.
+ */
+class SimpleMapWithFailures<InputT, OutputT, FailureT>
+    extends PTransform<PCollection<InputT>, 
WithFailures.Result<PCollection<OutputT>, FailureT>> {
+
+  private final transient TypeDescriptor<OutputT> outputType;
+  private final Contextful<Fn<InputT, OutputT>> fn;
+  private final transient TypeDescriptor<FailureT> failureType;
+  private final @Nullable ProcessFunction<ExceptionElement<InputT>, FailureT> 
exceptionHandler;
+  private final String transformName;
+
+  SimpleMapWithFailures(
+      String transformName,
+      Contextful<Fn<InputT, OutputT>> fn,
+      TypeDescriptor<OutputT> outputType,
+      @Nullable ProcessFunction<ExceptionElement<InputT>, FailureT> 
exceptionHandler,
+      TypeDescriptor<FailureT> failureType) {
+    this.transformName = transformName;
+    this.fn = fn;
+    this.outputType = outputType;
+    this.exceptionHandler = exceptionHandler;
+    this.failureType = failureType;
+  }
+
+  @Override
+  public WithFailures.Result<PCollection<OutputT>, FailureT> 
expand(PCollection<InputT> input) {
+    if (exceptionHandler == null) {
+      throw new NullPointerException(".exceptionsVia() is required");
+    }
+    return input.apply(
+        transformName,
+        MapElements.into(outputType)
+            .via(fn)
+            .exceptionsInto(failureType)
+            .exceptionsVia(exceptionHandler));
+  }
+
+  /**
+   * Returns a {@code PTransform} that catches exceptions raised while mapping 
elements, passing the
+   * raised exception instance and the input element being processed through 
the given {@code
+   * exceptionHandler} and emitting the result to a failure collection.
+   */
+  public SimpleMapWithFailures<InputT, OutputT, FailureT> exceptionsVia(
+      ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
+    return new SimpleMapWithFailures<>(
+        transformName, fn, outputType, exceptionHandler, failureType);
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapKeysTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapKeysTest.java
new file mode 100644
index 0000000..659388b
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapKeysTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link MapKeys} transform. */
+@RunWith(JUnit4.class)
+public class MapKeysTest {
+
+  private static final List<KV<Integer, String>> TABLE =
+      ImmutableList.of(KV.of(1, "one"), KV.of(2, "two"), KV.of(3, "none"));
+  private static final List<KV<String, String>> WORDS_TABLE =
+      ImmutableList.of(
+          KV.of("one", "Length = 3"), KV.of("three", "Length = 4"), KV.of("", 
"Length = 0"));
+
+  private static final List<KV<Integer, String>> EMPTY_TABLE = new 
ArrayList<>();
+  public static final String EXPECTED_FAILURE_MESSAGE = "/ by zero";
+
+  @Rule public final TestPipeline p = TestPipeline.create();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMapKeysInto() {
+
+    PCollection<KV<Integer, String>> input =
+        p.apply(
+            Create.of(TABLE)
+                .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
StringUtf8Coder.of())));
+
+    PCollection<KV<Double, String>> output =
+        input
+            .apply(
+                MapKeys.into(TypeDescriptors.doubles())
+                    .via((SerializableFunction<Integer, Double>) input1 -> 
input1 * 2d))
+            .setCoder(KvCoder.of(DoubleCoder.of(), StringUtf8Coder.of()));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(KV.of(2.0d, "one"), KV.of(4.0d, "two"), 
KV.of(6.0d, "none")));
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMapKeysWithFailures() {
+
+    PCollection<KV<String, String>> input =
+        p.apply(
+            Create.of(WORDS_TABLE)
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of())));
+
+    WithFailures.Result<PCollection<KV<Integer, String>>, String> result =
+        input.apply(
+            MapKeys.into(TypeDescriptors.integers())
+                .<String, String>via(word -> 1 / word.length())
+                .exceptionsInto(TypeDescriptors.strings())
+                .exceptionsVia(ee -> ee.exception().getMessage()));
+    result.output().setCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
StringUtf8Coder.of()));
+
+    PAssert.that(result.output())
+        .containsInAnyOrder(ImmutableList.of(KV.of(0, "Length = 3"), KV.of(0, 
"Length = 4")));
+    
PAssert.that(result.failures()).containsInAnyOrder(EXPECTED_FAILURE_MESSAGE);
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMapKeysEmpty() {
+
+    PCollection<KV<Integer, String>> input =
+        p.apply(
+            Create.of(EMPTY_TABLE)
+                .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
StringUtf8Coder.of())));
+
+    PCollection<KV<Double, String>> output =
+        input
+            
.apply(MapKeys.into(TypeDescriptors.doubles()).via(Integer::doubleValue))
+            .setCoder(KvCoder.of(DoubleCoder.of(), StringUtf8Coder.of()));
+
+    PAssert.that(output).empty();
+
+    p.run();
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapValuesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapValuesTest.java
new file mode 100644
index 0000000..e32d6e9
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapValuesTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link MapValues} transform. */
+@RunWith(JUnit4.class)
+public class MapValuesTest {
+
+  private static final List<KV<String, Integer>> TABLE =
+      ImmutableList.of(KV.of("one", 1), KV.of("two", 2), KV.of("dup", 2));
+  private static final List<KV<String, String>> WORDS_TABLE =
+      ImmutableList.of(
+          KV.of("Length = 3", "one"), KV.of("Length = 4", "three"), 
KV.of("Length = 0", ""));
+
+  private static final List<KV<String, Integer>> EMPTY_TABLE = new 
ArrayList<>();
+  public static final String EXPECTED_FAILURE_MESSAGE = "/ by zero";
+
+  @Rule public final TestPipeline p = TestPipeline.create();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMapValuesInto() {
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(
+            Create.of(TABLE)
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())));
+
+    PCollection<KV<String, Double>> output =
+        input
+            .apply(
+                MapValues.into(TypeDescriptors.doubles())
+                    .via((SerializableFunction<Integer, Double>) input1 -> 
input1 * 2d))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of()));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(KV.of("one", 2.0d), KV.of("two", 4.0d), 
KV.of("dup", 4.0d)));
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMapValuesWithFailures() {
+
+    PCollection<KV<String, String>> input =
+        p.apply(
+            Create.of(WORDS_TABLE)
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of())));
+
+    WithFailures.Result<PCollection<KV<String, Integer>>, String> result =
+        input.apply(
+            MapValues.into(TypeDescriptors.integers())
+                .<String, String>via(word -> 1 / word.length())
+                .exceptionsInto(TypeDescriptors.strings())
+                .exceptionsVia(ee -> ee.exception().getMessage()));
+    result.output().setCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of()));
+
+    PAssert.that(result.output())
+        .containsInAnyOrder(ImmutableList.of(KV.of("Length = 3", 0), 
KV.of("Length = 4", 0)));
+    
PAssert.that(result.failures()).containsInAnyOrder(EXPECTED_FAILURE_MESSAGE);
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMapValuesEmpty() {
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(
+            Create.of(EMPTY_TABLE)
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())));
+
+    PCollection<KV<String, Double>> output =
+        input
+            
.apply(MapValues.into(TypeDescriptors.doubles()).via(Integer::doubleValue))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of()));
+
+    PAssert.that(output).empty();
+
+    p.run();
+  }
+}

Reply via email to