Repository: crunch Updated Branches: refs/heads/master 2d20e7772 -> 435465870
CRUNCH-410: Spark 1.0.0 updates. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/43546587 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/43546587 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/43546587 Branch: refs/heads/master Commit: 4354658703c50f9506b30e6f0768b4a473817807 Parents: 2d20e77 Author: Josh Wills <[email protected]> Authored: Mon Jun 2 13:53:43 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Aug 24 12:54:14 2014 -0700 ---------------------------------------------------------------------- crunch-spark/pom.xml | 5 + .../org/apache/crunch/SparkTaskAttemptIT.java | 75 +++++++++++++++ .../crunch/fn/SDoubleFlatMapFunction.java | 40 ++++++++ .../org/apache/crunch/fn/SDoubleFunction.java | 36 +++++++ .../org/apache/crunch/fn/SFlatMapFunction.java | 40 ++++++++ .../org/apache/crunch/fn/SFlatMapFunction2.java | 42 +++++++++ .../java/org/apache/crunch/fn/SFunction.java | 36 +++++++ .../java/org/apache/crunch/fn/SFunction2.java | 38 ++++++++ .../java/org/apache/crunch/fn/SFunctions.java | 99 ++++++++++++++++++++ .../apache/crunch/fn/SPairFlatMapFunction.java | 43 +++++++++ .../org/apache/crunch/fn/SPairFunction.java | 40 ++++++++ .../java/org/apache/crunch/fn/SparkDoFn.java | 33 +++++++ .../java/org/apache/crunch/fn/SparkMapFn.java | 33 +++++++ .../apache/crunch/impl/spark/SparkRuntime.java | 10 +- .../crunch/impl/spark/SparkRuntimeContext.java | 23 ++++- .../crunch/impl/spark/collect/DoCollection.java | 12 +-- .../crunch/impl/spark/collect/DoTable.java | 15 ++- .../impl/spark/collect/PGroupedTableImpl.java | 12 ++- .../impl/spark/collect/ToByteArrayFunction.java | 4 +- .../crunch/impl/spark/collect/UnionTable.java | 2 +- .../impl/spark/fn/CombineMapsideFunction.java | 4 +- .../crunch/impl/spark/fn/CrunchPairTuple2.java | 38 ++++++++ .../crunch/impl/spark/fn/FlatMapDoFn.java | 41 -------- .../crunch/impl/spark/fn/FlatMapIndexFn.java | 50 ++++++++++ .../crunch/impl/spark/fn/FlatMapPairDoFn.java | 4 +- .../impl/spark/fn/InputConverterFunction.java | 2 +- .../crunch/impl/spark/fn/MapFunction.java | 4 +- .../crunch/impl/spark/fn/MapOutputFunction.java | 2 +- .../impl/spark/fn/OutputConverterFunction.java | 2 +- .../crunch/impl/spark/fn/PairFlatMapDoFn.java | 4 +- .../impl/spark/fn/PairFlatMapPairDoFn.java | 49 ---------- .../crunch/impl/spark/fn/PairMapFunction.java | 4 +- .../impl/spark/fn/PairMapIterableFunction.java | 4 +- .../spark/fn/PartitionedMapOutputFunction.java | 2 +- .../impl/spark/fn/ReduceGroupingFunction.java | 14 +-- .../impl/spark/fn/ReduceInputFunction.java | 7 +- pom.xml | 6 +- 37 files changed, 726 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-spark/pom.xml b/crunch-spark/pom.xml index dbe8169..e1a42eb 100644 --- a/crunch-spark/pom.xml +++ b/crunch-spark/pom.xml @@ -34,6 +34,11 @@ under the License. <artifactId>scala-library</artifactId> </dependency> <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>14.0.1</version> + </dependency> + <dependency> <groupId>org.apache.crunch</groupId> <artifactId>crunch-core</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/it/java/org/apache/crunch/SparkTaskAttemptIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkTaskAttemptIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkTaskAttemptIT.java new file mode 100644 index 0000000..6ff0b18 --- /dev/null +++ b/crunch-spark/src/it/java/org/apache/crunch/SparkTaskAttemptIT.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.crunch.impl.spark.SparkPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.types.avro.Avros; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class SparkTaskAttemptIT { + @Rule + public TemporaryPath tempDir = new TemporaryPath(); + + private SparkPipeline pipeline; + + @Before + public void setUp() throws IOException { + pipeline = new SparkPipeline("local", "taskattempt"); + } + + @After + public void tearDown() throws Exception { + pipeline.done(); + } + + @Test + public void testTaskAttempts() throws Exception { + String inputPath = tempDir.copyResourceFileName("set1.txt"); + String inputPath2 = tempDir.copyResourceFileName("set2.txt"); + + PCollection<String> first = pipeline.read(From.textFile(inputPath)); + PCollection<String> second = pipeline.read(From.textFile(inputPath2)); + + Iterable<Pair<Integer, Long>> cnts = first.union(second) + .parallelDo(new TaskMapFn(), Avros.ints()) + .count() + .materialize(); + assertEquals(ImmutableSet.of(Pair.of(0, 4L), Pair.of(1, 3L)), Sets.newHashSet(cnts)); + } + + private static class TaskMapFn extends MapFn<String, Integer> { + @Override + public Integer map(String input) { + return getContext().getTaskAttemptID().getTaskID().getId(); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java new file mode 100644 index 0000000..f3f67cc --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.Emitter; +import org.apache.spark.api.java.function.DoubleFlatMapFunction; + +/** + * A Crunch-compatible abstract base class for Spark's {@link DoubleFlatMapFunction}. Subclasses + * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}. + */ +public abstract class SDoubleFlatMapFunction<T> extends SparkDoFn<T, Double> + implements DoubleFlatMapFunction<T> { + @Override + public void process(T input, Emitter<Double> emitter) { + try { + for (Double d : call(input)) { + emitter.emit(d); + } + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFunction.java new file mode 100644 index 0000000..e32b4fa --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFunction.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.spark.api.java.function.DoubleFunction; + +/** + * A Crunch-compatible abstract base class for Spark's {@link DoubleFunction}. Subclasses + * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}. + */ +public abstract class SDoubleFunction<T> extends SparkMapFn<T, Double> implements DoubleFunction<T> { + @Override + public Double map(T input) { + try { + return call(input); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java new file mode 100644 index 0000000..1fecb76 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.Emitter; +import org.apache.spark.api.java.function.FlatMapFunction; + +/** + * A Crunch-compatible abstract base class for Spark's {@link FlatMapFunction}. Subclasses + * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}. + */ +public abstract class SFlatMapFunction<T, R> extends SparkDoFn<T, R> + implements FlatMapFunction<T, R> { + @Override + public void process(T input, Emitter<R> emitter) { + try { + for (R r : call(input)) { + emitter.emit(r); + } + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java new file mode 100644 index 0000000..0798f63 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.Pair; +import org.apache.spark.api.java.function.FlatMapFunction2; + +/** + * A Crunch-compatible abstract base class for Spark's {@link FlatMapFunction2}. Subclasses + * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}. + */ +public abstract class SFlatMapFunction2<K, V, R> extends DoFn<Pair<K, V>, R> + implements FlatMapFunction2<K, V, R> { + @Override + public void process(Pair<K, V> input, Emitter<R> emitter) { + try { + for (R r : call(input.first(), input.second())) { + emitter.emit(r); + } + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction.java new file mode 100644 index 0000000..3cacf52 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.spark.api.java.function.Function; + +/** + * A Crunch-compatible abstract base class for Spark's {@link Function}. Subclasses + * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}. + */ +public abstract class SFunction<T, R> extends SparkMapFn<T, R> implements Function<T, R> { + @Override + public R map(T input) { + try { + return call(input); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction2.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction2.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction2.java new file mode 100644 index 0000000..8322026 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction2.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.Pair; +import org.apache.spark.api.java.function.Function2; + +/** + * A Crunch-compatible abstract base class for Spark's {@link Function2}. Subclasses + * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}. + */ +public abstract class SFunction2<K, V, R> extends SparkMapFn<Pair<K, V>, R> + implements Function2<K, V, R> { + @Override + public R map(Pair<K, V> input) { + try { + return call(input.first(), input.second()); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java new file mode 100644 index 0000000..cc59746 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.spark.api.java.function.DoubleFlatMapFunction; +import org.apache.spark.api.java.function.DoubleFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.FlatMapFunction2; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; + +/** + * Utility methods for wrapping existing Spark Java API Functions for + * Crunch compatibility. + */ +public final class SFunctions { + + public static <T, R> SFunction<T, R> wrap(final Function<T, R> f) { + return new SFunction<T, R>() { + @Override + public R call(T t) throws Exception { + return f.call(t); + } + }; + } + + public static <K, V, R> SFunction2<K, V, R> wrap(final Function2<K, V, R> f) { + return new SFunction2<K, V, R>() { + @Override + public R call(K k, V v) throws Exception { + return f.call(k, v); + } + }; + } + + public static <T, K, V> SPairFunction<T, K, V> wrap(final PairFunction<T, K, V> f) { + return new SPairFunction<T, K, V>() { + @Override + public Tuple2<K, V> call(T t) throws Exception { + return f.call(t); + } + }; + } + + public static <T, R> SFlatMapFunction<T, R> wrap(final FlatMapFunction<T, R> f) { + return new SFlatMapFunction<T, R>() { + @Override + public Iterable<R> call(T t) throws Exception { + return f.call(t); + } + }; + } + + public static <K, V, R> SFlatMapFunction2<K, V, R> wrap(final FlatMapFunction2<K, V, R> f) { + return new SFlatMapFunction2<K, V, R>() { + @Override + public Iterable<R> call(K k, V v) throws Exception { + return f.call(k, v); + } + }; + } + + public static <T> SDoubleFunction<T> wrap(final DoubleFunction<T> f) { + return new SDoubleFunction<T>() { + @Override + public double call(T t) throws Exception { + return f.call(t); + } + }; + } + + public static <T> SDoubleFlatMapFunction<T> wrap(final DoubleFlatMapFunction<T> f) { + return new SDoubleFlatMapFunction<T>() { + @Override + public Iterable<Double> call(T t) throws Exception { + return f.call(t); + } + }; + } + + private SFunctions() {} +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java new file mode 100644 index 0000000..3b8e75a --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.Emitter; +import org.apache.crunch.Pair; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import scala.Tuple2; + +/** + * A Crunch-compatible abstract base class for Spark's {@link PairFlatMapFunction}. Subclasses + * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}. + */ +public abstract class SPairFlatMapFunction<T, K, V> extends SparkDoFn<T, Pair<K, V>> + implements PairFlatMapFunction<T, K, V> { + @Override + public void process(T input, Emitter<Pair<K, V>> emitter) { + try { + for (Tuple2<K, V> kv : call(input)) { + emitter.emit(Pair.of(kv._1(), kv._2())); + } + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFunction.java new file mode 100644 index 0000000..9d9317c --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFunction.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.Pair; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; + +/** + * A Crunch-compatible abstract base class for Spark's {@link PairFunction}. Subclasses + * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}. + */ +public abstract class SPairFunction<T, K, V> extends SparkMapFn<T, Pair<K, V>> + implements PairFunction<T, K, V> { + @Override + public Pair<K, V> map(T input) { + try { + Tuple2<K, V> t = call(input); + return t == null ? null : Pair.of(t._1(), t._2()); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SparkDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SparkDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SparkDoFn.java new file mode 100644 index 0000000..05adfa7 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SparkDoFn.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; + +abstract class SparkDoFn<T, R> extends DoFn<T, R> { + @Override + public final void initialize() { + // Forced no-op for Spark compatibility + } + + @Override + public final void cleanup(Emitter<R> emitter) { + // Forced no-op for Spark compatibility + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SparkMapFn.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SparkMapFn.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SparkMapFn.java new file mode 100644 index 0000000..1f90b55 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SparkMapFn.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; + +abstract class SparkMapFn<T, R> extends MapFn<T, R> { + @Override + public final void initialize() { + // Forced no-op for Spark compatibility + } + + @Override + public final void cleanup(Emitter<R> emitter) { + // Forced no-op for Spark compatibility + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java index b5bbc8d..687274a 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java @@ -121,7 +121,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe this.conf = conf; this.counters = sparkContext.accumulator(Maps.<String, Map<String, Long>>newHashMap(), new CounterAccumulatorParam()); - this.ctxt = new SparkRuntimeContext(counters, sparkContext.broadcast(WritableUtils.toByteArray(conf))); + this.ctxt = new SparkRuntimeContext(sparkContext.appName(), counters, + sparkContext.broadcast(WritableUtils.toByteArray(conf))); this.outputTargets = Maps.newTreeMap(DEPTH_COMPARATOR); this.outputTargets.putAll(outputTargets); this.toMaterialize = toMaterialize; @@ -305,11 +306,11 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe if (rdd instanceof JavaRDD) { outRDD = ((JavaRDD) rdd) .map(new MapFunction(c.applyPTypeTransforms() ? ptype.getOutputMapFn() : ident, ctxt)) - .map(new OutputConverterFunction(c)); + .mapToPair(new OutputConverterFunction(c)); } else { outRDD = ((JavaPairRDD) rdd) .map(new PairMapFunction(c.applyPTypeTransforms() ? ptype.getOutputMapFn() : ident, ctxt)) - .map(new OutputConverterFunction(c)); + .mapToPair(new OutputConverterFunction(c)); } try { Job job = new Job(conf); @@ -368,7 +369,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe private Counters getCounters() { Counters c = new Counters(); - for (Map.Entry<String, Map<String, Long>> e : counters.value().entrySet()) { + Map<String, Map<String, Long>> values = counters.value(); + for (Map.Entry<String, Map<String, Long>> e : values.entrySet()) { CounterGroup cg = c.getGroup(e.getKey()); for (Map.Entry<String, Long> f : e.getValue().entrySet()) { cg.findCounter(f.getKey()).setValue(f.getValue()); http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java index ca52c29..44d3573 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java @@ -18,6 +18,7 @@ package org.apache.crunch.impl.spark; import com.google.common.base.Joiner; +import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.ByteStreams; @@ -28,8 +29,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.mapred.SparkCounter; import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.StatusReporter; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.spark.Accumulator; import org.apache.spark.SparkFiles; @@ -44,14 +47,18 @@ import java.util.Map; public class SparkRuntimeContext implements Serializable { + private String jobName; private Broadcast<byte[]> broadConf; private final Accumulator<Map<String, Map<String, Long>>> counters; private transient Configuration conf; private transient TaskInputOutputContext context; + private transient Integer lastTID; public SparkRuntimeContext( + String jobName, Accumulator<Map<String, Map<String, Long>>> counters, Broadcast<byte[]> broadConf) { + this.jobName = jobName; this.counters = counters; this.broadConf = broadConf; } @@ -61,11 +68,19 @@ public class SparkRuntimeContext implements Serializable { this.conf = null; } - public void initialize(DoFn<?, ?> fn) { - if (context == null) { + public void initialize(DoFn<?, ?> fn, Integer tid) { + if (context == null || !Objects.equal(lastTID, tid)) { + TaskAttemptID attemptID; + if (tid != null) { + TaskID taskId = new TaskID(new JobID(jobName, 0), false, tid); + attemptID = new TaskAttemptID(taskId, 0); + lastTID = tid; + } else { + attemptID = new TaskAttemptID(); + lastTID = null; + } configureLocalFiles(); - context = TaskInputOutputContextFactory.create(getConfiguration(), new TaskAttemptID(), - new SparkReporter(counters)); + context = TaskInputOutputContextFactory.create(getConfiguration(), attemptID, new SparkReporter(counters)); } fn.setContext(context); fn.initialize(); http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java index 72888b8..6db14f2 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java @@ -23,11 +23,9 @@ import org.apache.crunch.impl.dist.collect.BaseDoCollection; import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.spark.SparkCollection; import org.apache.crunch.impl.spark.SparkRuntime; -import org.apache.crunch.impl.spark.fn.FlatMapDoFn; -import org.apache.crunch.impl.spark.fn.FlatMapPairDoFn; +import org.apache.crunch.impl.spark.fn.FlatMapIndexFn; import org.apache.crunch.types.PType; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.storage.StorageLevel; @@ -56,10 +54,8 @@ public class DoCollection<S> extends BaseDoCollection<S> implements SparkCollect private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime runtime) { JavaRDDLike<?, ?> parentRDD = ((SparkCollection) getOnlyParent()).getJavaRDDLike(runtime); fn.configure(runtime.getConfiguration()); - if (parentRDD instanceof JavaRDD) { - return ((JavaRDD) parentRDD).mapPartitions(new FlatMapDoFn(fn, runtime.getRuntimeContext())); - } else { - return ((JavaPairRDD) parentRDD).mapPartitions(new FlatMapPairDoFn(fn, runtime.getRuntimeContext())); - } + return parentRDD.mapPartitionsWithIndex( + new FlatMapIndexFn(fn, parentRDD instanceof JavaPairRDD, runtime.getRuntimeContext()), + false); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java index c0e4bb1..f593590 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java @@ -25,11 +25,10 @@ import org.apache.crunch.impl.dist.collect.BaseDoTable; import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.spark.SparkCollection; import org.apache.crunch.impl.spark.SparkRuntime; -import org.apache.crunch.impl.spark.fn.PairFlatMapDoFn; -import org.apache.crunch.impl.spark.fn.PairFlatMapPairDoFn; +import org.apache.crunch.impl.spark.fn.CrunchPairTuple2; +import org.apache.crunch.impl.spark.fn.FlatMapIndexFn; import org.apache.crunch.types.PTableType; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.storage.StorageLevel; @@ -69,10 +68,10 @@ public class DoTable<K, V> extends BaseDoTable<K, V> implements SparkCollection } JavaRDDLike<?, ?> parentRDD = ((SparkCollection) getOnlyParent()).getJavaRDDLike(runtime); fn.configure(runtime.getConfiguration()); - if (parentRDD instanceof JavaRDD) { - return ((JavaRDD) parentRDD).mapPartitions(new PairFlatMapDoFn(fn, runtime.getRuntimeContext())); - } else { - return ((JavaPairRDD) parentRDD).mapPartitions(new PairFlatMapPairDoFn(fn, runtime.getRuntimeContext())); - } + return parentRDD + .mapPartitionsWithIndex( + new FlatMapIndexFn(fn, parentRDD instanceof JavaPairRDD, runtime.getRuntimeContext()), + false) + .mapPartitionsToPair(new CrunchPairTuple2()); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java index 4de50b8..5e6594e 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java @@ -83,7 +83,8 @@ public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements S private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime runtime, CombineFn<K, V> combineFn) { JavaPairRDD<K, V> parentRDD = (JavaPairRDD<K, V>) ((SparkCollection)getOnlyParent()).getJavaRDDLike(runtime); if (combineFn != null) { - parentRDD = parentRDD.mapPartitions(new CombineMapsideFunction<K, V>(combineFn, runtime.getRuntimeContext())); + parentRDD = parentRDD.mapPartitionsToPair( + new CombineMapsideFunction<K, V>(combineFn, runtime.getRuntimeContext())); } SerDe keySerde, valueSerde; PTableType<K, V> parentType = ptype.getTableType(); @@ -106,13 +107,14 @@ public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements S if (groupingOptions.getPartitionerClass() != null) { groupedRDD = parentRDD .map(new PairMapFunction(ptype.getOutputMapFn(), runtime.getRuntimeContext())) - .map(new PartitionedMapOutputFunction(keySerde, valueSerde, ptype, groupingOptions.getPartitionerClass(), + .mapToPair( + new PartitionedMapOutputFunction(keySerde, valueSerde, ptype, groupingOptions.getPartitionerClass(), numPartitions, runtime.getRuntimeContext())) .groupByKey(new SparkPartitioner(numPartitions)); } else { groupedRDD = parentRDD .map(new PairMapFunction(ptype.getOutputMapFn(), runtime.getRuntimeContext())) - .map(new MapOutputFunction(keySerde, valueSerde)) + .mapToPair(new MapOutputFunction(keySerde, valueSerde)) .groupByKey(numPartitions); } @@ -121,12 +123,12 @@ public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements S groupedRDD = groupedRDD.sortByKey(scmp); } if (groupingOptions.getGroupingComparatorClass() != null) { - groupedRDD = groupedRDD.mapPartitions( + groupedRDD = groupedRDD.mapPartitionsToPair( new ReduceGroupingFunction(groupingOptions, ptype, runtime.getRuntimeContext())); } return groupedRDD .map(new ReduceInputFunction(keySerde, valueSerde)) - .map(new PairMapIterableFunction(ptype.getInputMapFn(), runtime.getRuntimeContext())); + .mapToPair(new PairMapIterableFunction(ptype.getInputMapFn(), runtime.getRuntimeContext())); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java index 4761754..9d2a10c 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java @@ -25,9 +25,9 @@ import scala.Tuple2; import java.util.List; -public class ToByteArrayFunction extends PairFunction<Tuple2<IntByteArray, List<byte[]>>, ByteArray, List<byte[]>> { +public class ToByteArrayFunction implements PairFunction<Tuple2<IntByteArray, List<byte[]>>, ByteArray, List<byte[]>> { @Override public Tuple2<ByteArray, List<byte[]>> call(Tuple2<IntByteArray, List<byte[]>> t) throws Exception { - return new Tuple2<ByteArray, List<byte[]>>(t._1, t._2); + return new Tuple2<ByteArray, List<byte[]>>(t._1(), t._2()); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java index b2776c5..826e5c4 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java @@ -59,7 +59,7 @@ public class UnionTable<K, V> extends BaseUnionTable<K, V> implements SparkColle rdds[i] = (JavaPairRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime); } else { JavaRDD rdd = (JavaRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime); - rdds[i] = rdd.mapPartitions(new PairFlatMapDoFn(IdentityFn.getInstance(), runtime.getRuntimeContext())); + rdds[i] = rdd.mapPartitionsToPair(new PairFlatMapDoFn(IdentityFn.getInstance(), runtime.getRuntimeContext())); } } return runtime.getSparkContext().union(rdds); http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java index 3cc8e05..1bea08d 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java @@ -32,7 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -public class CombineMapsideFunction<K, V> extends PairFlatMapFunction<Iterator<Tuple2<K, V>>, K, V> { +public class CombineMapsideFunction<K, V> implements PairFlatMapFunction<Iterator<Tuple2<K, V>>, K, V> { private static final int REDUCE_EVERY_N = 50000; @@ -46,7 +46,7 @@ public class CombineMapsideFunction<K, V> extends PairFlatMapFunction<Iterator<T @Override public Iterable<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter) throws Exception { - ctxt.initialize(combineFn); + ctxt.initialize(combineFn, null); Map<K, List<V>> cache = Maps.newHashMap(); int cnt = 0; while (iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java new file mode 100644 index 0000000..d6c544c --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.fn; + +import com.google.common.collect.Iterators; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.spark.GuavaUtils; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import scala.Tuple2; + +import java.util.Iterator; + +public class CrunchPairTuple2<K, V> implements PairFlatMapFunction<Iterator<Pair<K, V>>, K, V> { + @Override + public Iterable<Tuple2<K, V>> call(final Iterator<Pair<K, V>> iterator) throws Exception { + return new Iterable<Tuple2<K, V>>() { + @Override + public Iterator<Tuple2<K, V>> iterator() { + return Iterators.transform(iterator, GuavaUtils.<K, V>pair2tupleFunc()); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java deleted file mode 100644 index cfb6d42..0000000 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.spark.fn; - -import org.apache.crunch.DoFn; -import org.apache.crunch.impl.spark.SparkRuntimeContext; -import org.apache.spark.api.java.function.FlatMapFunction; - -import java.util.Iterator; - -public class FlatMapDoFn<S, T> extends FlatMapFunction<Iterator<S>, T> { - private final DoFn<S, T> fn; - private final SparkRuntimeContext ctxt; - - public FlatMapDoFn(DoFn<S, T> fn, SparkRuntimeContext ctxt) { - this.fn = fn; - this.ctxt = ctxt; - } - - @Override - public Iterable<T> call(Iterator<S> input) throws Exception { - ctxt.initialize(fn); - return new CrunchIterable<S, T>(fn, input); - } - -} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapIndexFn.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapIndexFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapIndexFn.java new file mode 100644 index 0000000..4eb5884 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapIndexFn.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.fn; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import org.apache.crunch.DoFn; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.spark.GuavaUtils; +import org.apache.crunch.impl.spark.SparkRuntimeContext; +import org.apache.crunch.util.DoFnIterator; +import org.apache.spark.api.java.function.Function2; +import scala.Tuple2; + +import javax.annotation.Nullable; +import java.util.Iterator; + +public class FlatMapIndexFn<S, T> implements Function2<Integer, Iterator, Iterator<T>> { + private final DoFn<S, T> fn; + private final boolean convertInput; + private final SparkRuntimeContext ctxt; + + public FlatMapIndexFn(DoFn<S, T> fn, boolean convertInput, SparkRuntimeContext ctxt) { + this.fn = fn; + this.convertInput = convertInput; + this.ctxt = ctxt; + } + + @Override + public Iterator<T> call(Integer partitionId, Iterator input) throws Exception { + ctxt.initialize(fn, partitionId); + Iterator in = convertInput ? Iterators.transform(input, GuavaUtils.tuple2PairFunc()) : input; + return new DoFnIterator<S, T>(in, fn); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java index 30aef83..8ec2834 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java @@ -27,7 +27,7 @@ import scala.Tuple2; import java.util.Iterator; -public class FlatMapPairDoFn<K, V, T> extends FlatMapFunction<Iterator<Tuple2<K, V>>, T> { +public class FlatMapPairDoFn<K, V, T> implements FlatMapFunction<Iterator<Tuple2<K, V>>, T> { private final DoFn<Pair<K, V>, T> fn; private final SparkRuntimeContext ctxt; @@ -38,7 +38,7 @@ public class FlatMapPairDoFn<K, V, T> extends FlatMapFunction<Iterator<Tuple2<K, @Override public Iterable<T> call(Iterator<Tuple2<K, V>> input) throws Exception { - ctxt.initialize(fn); + ctxt.initialize(fn, null); return new CrunchIterable<Pair<K, V>, T>(fn, Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc())); } http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java index 36745c1..d231527 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java @@ -21,7 +21,7 @@ import org.apache.crunch.types.Converter; import org.apache.spark.api.java.function.Function; import scala.Tuple2; -public class InputConverterFunction<K, V, S> extends Function<Tuple2<K, V>, S> { +public class InputConverterFunction<K, V, S> implements Function<Tuple2<K, V>, S> { private Converter<K, V, S, ?> converter; public InputConverterFunction(Converter<K, V, S, ?> converter) { http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java index b08aaeb..f611b4b 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java @@ -21,7 +21,7 @@ import org.apache.crunch.MapFn; import org.apache.crunch.impl.spark.SparkRuntimeContext; import org.apache.spark.api.java.function.Function; -public class MapFunction extends Function<Object, Object> { +public class MapFunction implements Function<Object, Object> { private final MapFn fn; private final SparkRuntimeContext ctxt; private boolean initialized; @@ -34,7 +34,7 @@ public class MapFunction extends Function<Object, Object> { @Override public Object call(Object o) throws Exception { if (!initialized) { - ctxt.initialize(fn); + ctxt.initialize(fn, null); initialized = true; } return fn.map(o); http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java index 47ef752..b8cd7c6 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java @@ -23,7 +23,7 @@ import org.apache.crunch.impl.spark.serde.SerDe; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; -public class MapOutputFunction<K, V> extends PairFunction<Pair<K, V>, ByteArray, byte[]> { +public class MapOutputFunction<K, V> implements PairFunction<Pair<K, V>, ByteArray, byte[]> { private final SerDe keySerde; private final SerDe valueSerde; http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java index b1184d8..0e611b0 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java @@ -21,7 +21,7 @@ import org.apache.crunch.types.Converter; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; -public class OutputConverterFunction<K, V, S> extends PairFunction<S, K, V> { +public class OutputConverterFunction<K, V, S> implements PairFunction<S, K, V> { private Converter<K, V, S, ?> converter; public OutputConverterFunction(Converter<K, V, S, ?> converter) { http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java index b2d93a0..7f289cc 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java @@ -27,7 +27,7 @@ import scala.Tuple2; import java.util.Iterator; -public class PairFlatMapDoFn<T, K, V> extends PairFlatMapFunction<Iterator<T>, K, V> { +public class PairFlatMapDoFn<T, K, V> implements PairFlatMapFunction<Iterator<T>, K, V> { private final DoFn<T, Pair<K, V>> fn; private final SparkRuntimeContext ctxt; @@ -38,7 +38,7 @@ public class PairFlatMapDoFn<T, K, V> extends PairFlatMapFunction<Iterator<T>, K @Override public Iterable<Tuple2<K, V>> call(Iterator<T> input) throws Exception { - ctxt.initialize(fn); + ctxt.initialize(fn, null); return Iterables.transform( new CrunchIterable<T, Pair<K, V>>(fn, input), GuavaUtils.<K, V>pair2tupleFunc()); http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java deleted file mode 100644 index bc3e701..0000000 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.spark.fn; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import org.apache.crunch.DoFn; -import org.apache.crunch.Pair; -import org.apache.crunch.impl.spark.GuavaUtils; -import org.apache.crunch.impl.spark.SparkRuntimeContext; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import scala.Tuple2; - -import java.util.Iterator; - -public class PairFlatMapPairDoFn<K, V, K2, V2> extends PairFlatMapFunction<Iterator<Tuple2<K, V>>, K2, V2> { - private final DoFn<Pair<K, V>, Pair<K2, V2>> fn; - private final SparkRuntimeContext ctxt; - - public PairFlatMapPairDoFn(DoFn<Pair<K, V>, Pair<K2, V2>> fn, SparkRuntimeContext ctxt) { - this.fn = fn; - this.ctxt = ctxt; - } - - @Override - public Iterable<Tuple2<K2, V2>> call(Iterator<Tuple2<K, V>> input) throws Exception { - ctxt.initialize(fn); - return Iterables.transform( - new CrunchIterable<Pair<K, V>, Pair<K2, V2>>( - fn, - Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc())), - GuavaUtils.<K2, V2>pair2tupleFunc()); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java index 673bbab..652452c 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java @@ -23,7 +23,7 @@ import org.apache.crunch.impl.spark.SparkRuntimeContext; import org.apache.spark.api.java.function.Function; import scala.Tuple2; -public class PairMapFunction<K, V, S> extends Function<Tuple2<K, V>, S> { +public class PairMapFunction<K, V, S> implements Function<Tuple2<K, V>, S> { private final MapFn<Pair<K, V>, S> fn; private final SparkRuntimeContext ctxt; private boolean initialized; @@ -36,7 +36,7 @@ public class PairMapFunction<K, V, S> extends Function<Tuple2<K, V>, S> { @Override public S call(Tuple2<K, V> kv) throws Exception { if (!initialized) { - ctxt.initialize(fn); + ctxt.initialize(fn, null); initialized = true; } return fn.map(Pair.of(kv._1(), kv._2())); http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java index 7bfe378..f36070f 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java @@ -26,7 +26,7 @@ import scala.Tuple2; import java.util.List; -public class PairMapIterableFunction<K, V, S, T> extends PairFunction<Pair<K, List<V>>, S, Iterable<T>> { +public class PairMapIterableFunction<K, V, S, T> implements PairFunction<Pair<K, List<V>>, S, Iterable<T>> { private final MapFn<Pair<K, List<V>>, Pair<S, Iterable<T>>> fn; private final SparkRuntimeContext runtimeContext; @@ -42,7 +42,7 @@ public class PairMapIterableFunction<K, V, S, T> extends PairFunction<Pair<K, Li @Override public Tuple2<S, Iterable<T>> call(Pair<K, List<V>> input) throws Exception { if (!initialized) { - runtimeContext.initialize(fn); + runtimeContext.initialize(fn, null); initialized = true; } Pair<S, Iterable<T>> out = fn.map(input); http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java index a10b7f6..e88217d 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java @@ -34,7 +34,7 @@ import scala.Tuple2; import java.io.IOException; -public class PartitionedMapOutputFunction<K, V> extends PairFunction<Pair<K, V>, IntByteArray, byte[]> { +public class PartitionedMapOutputFunction<K, V> implements PairFunction<Pair<K, V>, IntByteArray, byte[]> { private final SerDe<K> keySerde; private final SerDe<V> valueSerde; http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java index 35dd7dd..d3dd69e 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java @@ -33,7 +33,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; -public class ReduceGroupingFunction extends PairFlatMapFunction<Iterator<Tuple2<ByteArray, List<byte[]>>>, +public class ReduceGroupingFunction implements PairFlatMapFunction<Iterator<Tuple2<ByteArray, List<byte[]>>>, ByteArray, List<byte[]>> { private final GroupingOptions options; @@ -97,13 +97,13 @@ public class ReduceGroupingFunction extends PairFlatMapFunction<Iterator<Tuple2< while (iter.hasNext()) { Tuple2<ByteArray, List<byte[]>> t = iter.next(); if (key == null) { - key = t._1; - bytes.addAll(t._2); - } else if (cmp.compare(key.value, 0, key.value.length, t._1.value, 0, t._1.value.length) == 0) { - bytes.addAll(t._2); + key = t._1(); + bytes.addAll(t._2()); + } else if (cmp.compare(key.value, 0, key.value.length, t._1().value, 0, t._1().value.length) == 0) { + bytes.addAll(t._2()); } else { - nextKey = t._1; - next = Lists.newArrayList(t._2); + nextKey = t._1(); + next = Lists.newArrayList(t._2()); break; } } http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java index 4ebdfaa..11e3bde 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java @@ -19,6 +19,7 @@ */ package org.apache.crunch.impl.spark.fn; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.crunch.Pair; import org.apache.crunch.impl.spark.ByteArray; @@ -28,7 +29,7 @@ import scala.Tuple2; import java.util.List; -public class ReduceInputFunction<K, V> extends Function<Tuple2<ByteArray, List<byte[]>>, Pair<K, List<V>>> { +public class ReduceInputFunction<K, V> implements Function<Tuple2<ByteArray, Iterable<byte[]>>, Pair<K, Iterable<V>>> { private final SerDe<K> keySerDe; private final SerDe<V> valueSerDe; @@ -38,7 +39,7 @@ public class ReduceInputFunction<K, V> extends Function<Tuple2<ByteArray, List<b } @Override - public Pair<K, List<V>> call(Tuple2<ByteArray, List<byte[]>> kv) throws Exception { - return Pair.of(keySerDe.fromBytes(kv._1.value), Lists.transform(kv._2, valueSerDe.fromBytesFunction())); + public Pair<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> kv) throws Exception { + return Pair.of(keySerDe.fromBytes(kv._1().value), Iterables.transform(kv._2(), valueSerDe.fromBytesFunction())); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d86df7e..8e2f5a0 100644 --- a/pom.xml +++ b/pom.xml @@ -98,7 +98,7 @@ under the License. <scala.base.version>2.10</scala.base.version> <scala.version>2.10.4</scala.version> <scalatest.version>1.9.1</scalatest.version> - <spark.version>0.9.1</spark.version> + <spark.version>1.0.0</spark.version> <jsr305.version>1.3.9</jsr305.version> </properties> @@ -239,6 +239,10 @@ under the License. <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro-ipc</artifactId> + </exclusion> </exclusions> </dependency>
