Repository: crunch Updated Branches: refs/heads/master f8920d355 -> 7d7af4ef4
http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java new file mode 100644 index 0000000..0b4e4fa --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Target; +import org.apache.crunch.lambda.fn.SFunction; +import org.apache.crunch.lambda.fn.SPredicate; +import org.apache.crunch.lib.join.DefaultJoinStrategy; +import org.apache.crunch.lib.join.JoinStrategy; +import org.apache.crunch.lib.join.JoinType; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; + +import java.util.Collection; + +/** + * Java 8 friendly version of the {@link PTable} interface, allowing distributed operations to be expressed in + * terms of lambda expressions and method references, instead of creating a new class implementation for each operation. + * @param <K> key type for this table + * @param <V> value type for this table + */ +public interface LTable<K, V> extends LCollection<Pair<K, V>> { + /** + * Get the underlying {@link PTable} for this LCollection + */ + PTable<K, V> underlying(); + + /** + * Group this table by key to yield a {@link LGroupedTable} + */ + default LGroupedTable<K, V> groupByKey() { + return factory().wrap(underlying().groupByKey()); + } + + /** + * Group this table by key to yield a {@link LGroupedTable} + */ + default LGroupedTable<K, V> groupByKey(int numReducers) { + return factory().wrap(underlying().groupByKey(numReducers)); + } + + /** + * Group this table by key to yield a {@link LGroupedTable} + */ + default LGroupedTable<K, V> groupByKey(GroupingOptions opts) { + return factory().wrap(underlying().groupByKey(opts)); + } + + /** + * Get an {@link LCollection} containing just the keys from this table + */ + default LCollection<K> keys() { + return factory().wrap(underlying().keys()); + } + + /** + * Get an {@link LCollection} containing just the values from this table + */ + default LCollection<V> values() { + return factory().wrap(underlying().values()); + } + + /** + * Transform the keys of this table using the given function + */ + default <T> LTable<T, V> mapKeys(SFunction<K, T> fn, PType<T> pType) { + return parallelDo( + ctx -> ctx.emit(Pair.of(fn.apply(ctx.element().first()), ctx.element().second())), + ptf().tableOf(pType, valueType())); + } + + /** + * Transform the values of this table using the given function + */ + default <T> LTable<K, T> mapValues(SFunction<V, T> fn, PType<T> pType) { + return parallelDo( + ctx -> ctx.emit(Pair.of(ctx.element().first(), fn.apply(ctx.element().second()))), + ptf().tableOf(keyType(), pType)); + } + + /** + * Join this table to another {@link LTable} which has the same key type using the provided {@link JoinType} and + * {@link JoinStrategy} + */ + default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other, JoinType joinType, JoinStrategy<K, V, U> joinStrategy) { + return factory().wrap(joinStrategy.join(underlying(), other.underlying(), joinType)); + } + + /** + * Join this table to another {@link LTable} which has the same key type using the provide {@link JoinType} and + * the {@link DefaultJoinStrategy} (reduce-side join). + */ + default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other, JoinType joinType) { + return join(other, joinType, new DefaultJoinStrategy<>()); + } + + /** + * Inner join this table to another {@link LTable} which has the same key type using a reduce-side join + */ + default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other) { + return join(other, JoinType.INNER_JOIN); + } + + /** + * Cogroup this table with another {@link LTable} with the same key type, collecting the set of values from + * each side. + */ + default <U> LTable<K, Pair<Collection<V>, Collection<U>>> cogroup(LTable<K, U> other) { + return factory().wrap(underlying().cogroup(other.underlying())); + } + + /** + * Get the underlying {@link PTableType} used to serialize key/value pairs in this table + */ + default PTableType<K, V> pType() { return underlying().getPTableType(); } + + /** + * Get a {@link PType} which can be used to serialize the key part of this table + */ + default PType<K> keyType() { + return underlying().getKeyType(); + } + + /** + * Get a {@link PType} which can be used to serialize the value part of this table + */ + default PType<V> valueType() { + return underlying().getValueType(); + } + + /** + * Write this table to the {@link Target} supplied. + */ + default LTable<K, V> write(Target target) { + underlying().write(target); + return this; + } + + /** + * Write this table to the {@link Target} supplied. + */ + default LTable<K, V> write(Target target, Target.WriteMode writeMode) { + underlying().write(target, writeMode); + return this; + } + + /** {@inheritDoc} */ + default LTable<K, V> increment(Enum<?> counter) { + return parallelDo(ctx -> ctx.increment(counter), pType()); + } + + /** {@inheritDoc} */ + default LTable<K, V> increment(String counterGroup, String counterName) { + return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType()); + } + + /** {@inheritDoc} */ + default LTable<K, V> incrementIf(Enum<?> counter, SPredicate<Pair<K, V>> condition) { + return parallelDo(ctx -> { + if (condition.test(ctx.element())) ctx.increment(counter); + }, pType()); + } + + /** {@inheritDoc} */ + default LTable<K, V> incrementIf(String counterGroup, String counterName, SPredicate<Pair<K, V>> condition) { + return parallelDo(ctx -> { + if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName); + }, pType()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java new file mode 100644 index 0000000..07fad2b --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; + +/** + * Entry point for the crunch-lambda API. Use this to create {@link LCollection}, {@link LTable} and + * {@link LGroupedTable} objects from their corresponding {@link PCollection}, {@link PTable} and {@link PGroupedTable} + * types. + * + * <p>The crunch-lambda API allows you to write Crunch pipelines using lambda expressions and method references instead + * of creating classes (anonymous, inner, or top level) for each operation that needs to be completed. Many pipelines + * are composed of a large number of simple operations, rather than a small number of complex operations, making this + * strategy much more efficient to code and easy to read for those able to use Java 8 in their distributed computation + * environments.</p> + * + * <p>You use the API by wrapping your Crunch type into an L-type object. This class provides static methods for that. + * You can then use the lambda API methods on the L-type object, yielding more L-type objects. If at any point you need + * to go back to the standard Crunch world (for compatibility with existing code or complex use cases), you can at any + * time call underlying() on an L-type object to get a Crunch object</p> + * + * <p>Example (the obligatory wordcount):</p> + * + * <pre>{@code + * Pipeline pipeline = new MRPipeline(getClass()); + * LCollection<String> inputText = Lambda.wrap(pipeline.readTextFile("/path/to/input/file")); + * inputText.flatMap(line -> Arrays.stream(line.split(" ")), Writables.strings()) + * .count() + * .map(wordCountPair -> wordCountPair.first() + ": " + wordCountPair.second(), strings()) + * .write(To.textFile("/path/to/output/file")); + * pipeline.run(); + * }</pre> + * + */ +public class Lambda { + private static LCollectionFactory INSTANCE = new LCollectionFactoryImpl(); + + public static <S> LCollection<S> wrap(PCollection<S> collection) { return INSTANCE.wrap(collection); } + public static <K, V> LTable<K, V> wrap(PTable<K, V> collection) { return INSTANCE.wrap(collection); } + public static <K, V> LGroupedTable<K, V> wrap(PGroupedTable<K, V> collection) { return INSTANCE.wrap(collection); } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java new file mode 100644 index 0000000..6e5030f --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda.fn; + +import java.io.Serializable; +import java.util.function.BiConsumer; + +/** + * Serializable version of the Java BiConsumer functional interface. + */ +@FunctionalInterface +public interface SBiConsumer<K, V> extends BiConsumer<K, V>, Serializable { +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java new file mode 100644 index 0000000..5aac5bc --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda.fn; + +import java.io.Serializable; +import java.util.function.BiFunction; + +/** + * Serializable version of the Java BiFunction functional interface. + */ +@FunctionalInterface +public interface SBiFunction<K, V, T> extends BiFunction<K, V, T>, Serializable { +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java new file mode 100644 index 0000000..d1e4cc0 --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda.fn; + +import java.io.Serializable; +import java.util.function.BinaryOperator; + +/** + * Serializable version of the Java BinaryOperator functional interface. + */ +@FunctionalInterface +public interface SBinaryOperator<T> extends BinaryOperator<T>, Serializable { +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java new file mode 100644 index 0000000..90f4a99 --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda.fn; + +import java.io.Serializable; +import java.util.function.Consumer; + +/** + * Serializable version of the Java Consumer functional interface. + */ +@FunctionalInterface +public interface SConsumer<T> extends Consumer<T>, Serializable { +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java new file mode 100644 index 0000000..d120efe --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda.fn; + +import java.io.Serializable; +import java.util.function.Function; + +/** + * Serializable version of the Java Function functional interface. + */ +@FunctionalInterface +public interface SFunction<S, T> extends Function<S, T>, Serializable { +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java new file mode 100644 index 0000000..9e90bab --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda.fn; + +import java.io.Serializable; +import java.util.function.Predicate; + +/** + * Serializable version of the Java Predicate functional interface. + */ +@FunctionalInterface +public interface SPredicate<T> extends Predicate<T>, Serializable { +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java new file mode 100644 index 0000000..eea254a --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda.fn; + +import java.io.Serializable; +import java.util.function.Supplier; + +/** + * Serializable version of the Java Supplier functional interface. + */ +@FunctionalInterface +public interface SSupplier<T> extends Supplier<T>, Serializable { +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java new file mode 100644 index 0000000..ad18232 --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Serializable versions of the functional interfaces that ship with Java 8 + */ +package org.apache.crunch.lambda.fn; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java new file mode 100644 index 0000000..9c03148 --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <p>Alternative Crunch API using Java 8 features to allow construction of pipelines using lambda functions and method + * references. It works by wrapping standards Java {@link org.apache.crunch.PCollection}, + * {@link org.apache.crunch.PTable} and {@link org.apache.crunch.PGroupedTable} instances into the corresponding + * {@link org.apache.crunch.lambda.LCollection}, {@link org.apache.crunch.lambda.LTable} and + * {@link org.apache.crunch.lambda.LGroupedTable classes}.</p> + * + * <p>The static class {@link org.apache.crunch.lambda.Lambda} has methods to create these. Please also see the Javadocs + * for {@link org.apache.crunch.lambda.Lambda} for usage examples</p> + */ +package org.apache.crunch.lambda; + http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java new file mode 100644 index 0000000..b819d0d --- /dev/null +++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import com.google.common.collect.ImmutableMap; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.types.avro.Avros; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import static org.apache.crunch.lambda.TestCommon.*; +import static org.apache.crunch.lambda.TypedRecord.rec; +import static org.apache.crunch.types.avro.Avros.*; +import static org.junit.Assert.*; + +public class LCollectionTest { + + private LCollection<TypedRecord> lc() { + return Lambda.wrap(MemPipeline.typedCollectionOf(Avros.reflects(TypedRecord.class), + rec(14, "Alice", 101L), + rec(25, "Bo B", 102L), + rec(21, "Char Lotte", 103L), + rec(28, "David", 104L), + rec(31, "Erik", 105L))); + } + + @Test + public void testParallelDo() throws Exception { + LCollection<String> result = lc().parallelDo(ctx -> { if (ctx.element().key > 26) ctx.emit(ctx.element().name); }, strings()); + assertCollectionOf(result, "David", "Erik"); + } + + @Test + public void testParallelDoPair() throws Exception { + LTable<Integer, String> result = lc().parallelDo(ctx -> { + if (ctx.element().key > 26) ctx.emit(Pair.of(ctx.element().key, ctx.element().name)); }, tableOf(ints(), strings())); + assertCollectionOf(result, Pair.of(28, "David"), Pair.of(31, "Erik")); + } + + + @Test + public void testMap() throws Exception { + assertCollectionOf(lc().map(r -> r.key, ints()), 14, 25, 21, 28, 31); + } + + @Test + public void testMapPair() throws Exception { + assertCollectionOf(lc().map(r -> Pair.of(r.key, r.value), tableOf(ints(), longs())), + Pair.of(14, 101L), + Pair.of(25, 102L), + Pair.of(21, 103L), + Pair.of(28, 104L), + Pair.of(31, 105L)); + } + + @Test + public void testFlatMap() throws Exception { + assertCollectionOf( + lc().flatMap(s -> Arrays.stream(s.name.split(" ")), strings()), + "Alice", "Bo", "B", "Char", "Lotte", "David", "Erik"); + } + + + @Test + public void testFilterMap() throws Exception { + Map<String, String> lookupMap = ImmutableMap.of("Erik", "BOOM", "Alice", "POW"); + assertCollectionOf( + lc().filterMap(r -> lookupMap.containsKey(r.name) ? Optional.of(lookupMap.get(r.name)) : Optional.empty(), strings()), + "BOOM", "POW" + ); + } + + @Test + public void testFilter() throws Exception { + assertCollectionOf(lc().filter(r -> r.key == 21), rec(21, "Char Lotte", 103L)); + } + + + @Test + public void testIncrement() throws Exception { + lc().increment("hello", "world"); + long value = MemPipeline.getCounters().findCounter("hello", "world").getValue(); + assertEquals(5L, value); + } + + @Test + public void testIncrementIf() throws Exception { + lc().incrementIf("hello", "conditional_world", r -> r.key < 25); + long value = MemPipeline.getCounters().findCounter("hello", "conditional_world").getValue(); + assertEquals(2L, value); + } + + @Test + public void testBy() throws Exception { + assertCollectionOf( + lc().filter(r -> r.key == 21).by(r -> r.key, ints()), + Pair.of(21, rec(21, "Char Lotte", 103L))); + } + + @Test + public void testCount() throws Exception { + assertCollectionOf( + Lambda.wrap(MemPipeline.typedCollectionOf(strings(), "a", "a", "a", "b", "b")).count(), + Pair.of("a", 3L), + Pair.of("b", 2L) + ); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java new file mode 100644 index 0000000..043387e --- /dev/null +++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.crunch.Pair; +import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.impl.mem.MemPipeline; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +import static org.apache.crunch.lambda.TestCommon.assertCollectionOf; +import static org.apache.crunch.types.avro.Avros.*; + + +public class LGroupedTableTest { + + LGroupedTable<String, Integer> lgt = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(), ints()), + "a", 2, + "a", 3, + "b", 5, + "c", 7, + "c", 11, + "c", 13, + "c", 13)) + .groupByKey(); + + @Test + public void testCombineValues() throws Exception { + assertCollectionOf(lgt.combineValues(Aggregators.MAX_INTS()), + Pair.of("a", 3), + Pair.of("b", 5), + Pair.of("c", 13)); + } + + @Test + public void testCombineValues1() throws Exception { + assertCollectionOf(lgt.combineValues(() -> Integer.MIN_VALUE, Integer::max, Collections::singleton), + Pair.of("a", 3), + Pair.of("b", 5), + Pair.of("c", 13)); + } + + @Test + public void testMapValues() throws Exception { + assertCollectionOf(lgt.mapValues(vs -> vs.map(i -> i.toString()).reduce((a, b) -> a + "," + b).get(), strings()), + Pair.of("a", "2,3"), + Pair.of("b", "5"), + Pair.of("c", "7,11,13,13")); + } + + @Test + public void testCollectValues() throws Exception { + assertCollectionOf(lgt.collectValues(ArrayList::new, Collection::add, collections(ints())), + Pair.of("a", ImmutableList.of(2,3)), + Pair.of("b", ImmutableList.of(5)), + Pair.of("c", ImmutableList.of(7, 11, 13, 13))); + } + + @Test + public void testCollectAllValues() throws Exception { + assertCollectionOf(lgt.collectAllValues(), + Pair.of("a", ImmutableList.of(2,3)), + Pair.of("b", ImmutableList.of(5)), + Pair.of("c", ImmutableList.of(7, 11, 13, 13))); + } + + @Test + public void testCollectUniqueValues() throws Exception { + assertCollectionOf(lgt.collectUniqueValues(), + Pair.of("a", ImmutableSet.of(2, 3)), + Pair.of("b", ImmutableSet.of(5)), + Pair.of("c", ImmutableSet.of(7, 11, 13))); + } + + @Test + public void testReduceValues() throws Exception { + assertCollectionOf(lgt.reduceValues((a, b) -> a * b), + Pair.of("a", 6), + Pair.of("b", 5), + Pair.of("c", 7 * 11 * 13 * 13) + ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java new file mode 100644 index 0000000..f66ada5 --- /dev/null +++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import com.google.common.collect.ImmutableList; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mem.MemPipeline; +import org.junit.Test; + +import static org.apache.crunch.lambda.TestCommon.assertCollectionOf; +import static org.apache.crunch.types.avro.Avros.*; + + +public class LTableTest { + + private LTable<String, Integer> lt1 = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(), ints()), + "a", 2, + "a", 3, + "b", 5, + "c", 7, + "c", 11, + "c", 13, + "c", 13)); + + private LTable<String, Long> lt2 = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(), longs()), + "a", 101L, + "b", 102L, + "c", 103L + )); + + @Test + public void testKeys() throws Exception { + assertCollectionOf(lt1.keys(), "a", "a", "b", "c", "c", "c", "c"); + } + + @Test + public void testValues() throws Exception { + assertCollectionOf(lt2.values(), 101L, 102L, 103L); + } + + @Test + public void testMapKeys() throws Exception { + assertCollectionOf(lt2.mapKeys(String::toUpperCase, strings()), + Pair.of("A", 101L), + Pair.of("B", 102L), + Pair.of("C", 103L) + ); + } + + @Test + public void testMapValues() throws Exception { + assertCollectionOf(lt2.mapValues(v -> v * 2, longs()), + Pair.of("a", 202L), + Pair.of("b", 204L), + Pair.of("c", 206L) + ); + } + + @Test + public void testJoin() throws Exception { + assertCollectionOf(lt1.join(lt2).values(), + Pair.of(2, 101L), + Pair.of(3, 101L), + Pair.of(5, 102L), + Pair.of(7, 103L), + Pair.of(11, 103L), + Pair.of(13, 103L), + Pair.of(13, 103L)); + } + + @Test + public void testCogroup() throws Exception { + assertCollectionOf(lt1.cogroup(lt2).values(), + Pair.of(ImmutableList.of(2, 3), ImmutableList.of(101L)), + Pair.of(ImmutableList.of(5), ImmutableList.of(102L)), + Pair.of(ImmutableList.of(7, 11, 13, 13), ImmutableList.of(103L)) + ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java new file mode 100644 index 0000000..02101ab --- /dev/null +++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import com.google.common.collect.Sets; + +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +public class TestCommon { + @SafeVarargs + public static <T> void assertCollectionOf(LCollection<T> actual, T... expected) { + Set<T> actualSet = actual.materialize().collect(Collectors.toSet()); + Set<T> expectedSet = Sets.newHashSet(expected); + assertEquals(expectedSet, actualSet); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java new file mode 100644 index 0000000..42540de --- /dev/null +++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +public class TypedRecord { + public int key; + public String name; + public long value; + public static TypedRecord rec(int key, String name, long value) { + TypedRecord record = new TypedRecord(); + record.key = key; + record.name = name; + record.value = value; + return record; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TypedRecord that = (TypedRecord) o; + + if (key != that.key) return false; + if (value != that.value) return false; + return name != null ? name.equals(that.name) : that.name == null; + + } + + @Override + public int hashCode() { + int result = key; + result = 31 * result + (name != null ? name.hashCode() : 0); + result = 31 * result + (int) (value ^ (value >>> 32)); + return result; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0758e43..b628a77 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,17 @@ under the License. <module>crunch-hive</module> <module>crunch-dist</module> </modules> + <profiles> + <profile> + <id>java-8</id> + <activation> + <jdk>[1.8,]</jdk> + </activation> + <modules> + <module>crunch-lambda</module> + </modules> + </profile> + </profiles> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> @@ -163,6 +174,12 @@ under the License. </dependency> <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-lambda</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version>
