Repository: crunch Updated Branches: refs/heads/master 0a6c8decb -> f12eab83e
CRUNCH-469: Fix CCE in crunch-spark InputTables Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f12eab83 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f12eab83 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f12eab83 Branch: refs/heads/master Commit: f12eab83e18caed24a168c5753902c50e2b4c1a5 Parents: 0a6c8de Author: Josh Wills <[email protected]> Authored: Thu Sep 4 13:17:26 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Sep 4 13:17:26 2014 -0700 ---------------------------------------------------------------------- .../apache/crunch/impl/spark/GuavaUtils.java | 2 +- .../crunch/impl/spark/collect/InputTable.java | 4 +- .../crunch/impl/spark/fn/Tuple2MapFunction.java | 45 ++++++++++++++++++++ 3 files changed, 48 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f12eab83/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java index 400ae7b..cb67473 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java @@ -28,7 +28,7 @@ public class GuavaUtils { return new Function<Tuple2<K, V>, Pair<K, V>>() { @Override public Pair<K, V> apply(@Nullable Tuple2<K, V> kv) { - return kv == null ? null : Pair.of(kv._1, kv._2); + return kv == null ? null : Pair.of(kv._1(), kv._2()); } }; } http://git-wip-us.apache.org/repos/asf/crunch/blob/f12eab83/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java index e83d912..a0f7189 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java @@ -27,7 +27,7 @@ import org.apache.crunch.impl.mr.run.CrunchInputFormat; import org.apache.crunch.impl.spark.SparkCollection; import org.apache.crunch.impl.spark.SparkRuntime; import org.apache.crunch.impl.spark.fn.InputConverterFunction; -import org.apache.crunch.impl.spark.fn.PairMapFunction; +import org.apache.crunch.impl.spark.fn.Tuple2MapFunction; import org.apache.crunch.types.Converter; import org.apache.hadoop.mapreduce.Job; import org.apache.spark.api.java.JavaPairRDD; @@ -56,7 +56,7 @@ public class InputTable<K, V> extends BaseInputTable<K, V> implements SparkColle MapFn mapFn = converter.applyPTypeTransforms() ? source.getType().getInputMapFn() : IdentityFn.getInstance(); return input .map(new InputConverterFunction(source.getConverter())) - .map(new PairMapFunction(mapFn, runtime.getRuntimeContext())); + .mapToPair(new Tuple2MapFunction(mapFn, runtime.getRuntimeContext())); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/crunch/blob/f12eab83/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/Tuple2MapFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/Tuple2MapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/Tuple2MapFunction.java new file mode 100644 index 0000000..4ed553d --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/Tuple2MapFunction.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.fn; + +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.spark.SparkRuntimeContext; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; + +public class Tuple2MapFunction<K, V> implements PairFunction<Pair<K, V>, K, V> { + private final MapFn<Pair<K, V>, Pair<K, V>> fn; + private final SparkRuntimeContext ctxt; + private boolean initialized; + + public Tuple2MapFunction(MapFn<Pair<K, V>, Pair<K, V>> fn, SparkRuntimeContext ctxt) { + this.fn = fn; + this.ctxt = ctxt; + } + + @Override + public Tuple2<K, V> call(Pair<K, V> p) throws Exception { + if (!initialized) { + ctxt.initialize(fn, null); + initialized = true; + } + Pair<K, V> res = fn.map(p); + return new Tuple2<K, V>(res.first(), res.second()); + } +}
