Updated Branches: refs/heads/master 11e9b53e2 -> 6e6234138
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java new file mode 100644 index 0000000..b1184d8 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.fn; + +import org.apache.crunch.types.Converter; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; + +public class OutputConverterFunction<K, V, S> extends PairFunction<S, K, V> { + private Converter<K, V, S, ?> converter; + + public OutputConverterFunction(Converter<K, V, S, ?> converter) { + this.converter = converter; + } + + @Override + public Tuple2<K, V> call(S s) throws Exception { + return new Tuple2<K, V>(converter.outputKey(s), converter.outputValue(s)); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java new file mode 100644 index 0000000..b2d93a0 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.fn; + +import com.google.common.collect.Iterables; +import org.apache.crunch.DoFn; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.spark.GuavaUtils; +import org.apache.crunch.impl.spark.SparkRuntimeContext; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import scala.Tuple2; + +import java.util.Iterator; + +public class PairFlatMapDoFn<T, K, V> extends PairFlatMapFunction<Iterator<T>, K, V> { + private final DoFn<T, Pair<K, V>> fn; + private final SparkRuntimeContext ctxt; + + public PairFlatMapDoFn(DoFn<T, Pair<K, V>> fn, SparkRuntimeContext ctxt) { + this.fn = fn; + this.ctxt = ctxt; + } + + @Override + public Iterable<Tuple2<K, V>> call(Iterator<T> input) throws Exception { + ctxt.initialize(fn); + return Iterables.transform( + new CrunchIterable<T, Pair<K, V>>(fn, input), + GuavaUtils.<K, V>pair2tupleFunc()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java new file mode 100644 index 0000000..bc3e701 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.fn; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import org.apache.crunch.DoFn; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.spark.GuavaUtils; +import org.apache.crunch.impl.spark.SparkRuntimeContext; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import scala.Tuple2; + +import java.util.Iterator; + +public class PairFlatMapPairDoFn<K, V, K2, V2> extends PairFlatMapFunction<Iterator<Tuple2<K, V>>, K2, V2> { + private final DoFn<Pair<K, V>, Pair<K2, V2>> fn; + private final SparkRuntimeContext ctxt; + + public PairFlatMapPairDoFn(DoFn<Pair<K, V>, Pair<K2, V2>> fn, SparkRuntimeContext ctxt) { + this.fn = fn; + this.ctxt = ctxt; + } + + @Override + public Iterable<Tuple2<K2, V2>> call(Iterator<Tuple2<K, V>> input) throws Exception { + ctxt.initialize(fn); + return Iterables.transform( + new CrunchIterable<Pair<K, V>, Pair<K2, V2>>( + fn, + Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc())), + GuavaUtils.<K2, V2>pair2tupleFunc()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java new file mode 100644 index 0000000..6db30f0 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.fn; + +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.spark.SparkRuntimeContext; +import org.apache.spark.api.java.function.Function; +import scala.Tuple2; + +public class PairMapFunction<K, V, S> extends Function<Tuple2<K, V>, S> { + private final MapFn<Pair<K, V>, S> fn; + private final SparkRuntimeContext ctxt; + private boolean initialized; + + public PairMapFunction(MapFn<Pair<K, V>, S> fn, SparkRuntimeContext ctxt) { + this.fn = fn; + this.ctxt = ctxt; + } + + @Override + public S call(Tuple2<K, V> kv) throws Exception { + if (!initialized) { + ctxt.initialize(fn); + initialized = true; + } + return fn.map(Pair.of(kv._1, kv._2)); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java new file mode 100644 index 0000000..7bfe378 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.fn; + +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.spark.SparkRuntimeContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; + +import java.util.List; + +public class PairMapIterableFunction<K, V, S, T> extends PairFunction<Pair<K, List<V>>, S, Iterable<T>> { + + private final MapFn<Pair<K, List<V>>, Pair<S, Iterable<T>>> fn; + private final SparkRuntimeContext runtimeContext; + private boolean initialized; + + public PairMapIterableFunction( + MapFn<Pair<K, List<V>>, Pair<S, Iterable<T>>> fn, + SparkRuntimeContext runtimeContext) { + this.fn = fn; + this.runtimeContext = runtimeContext; + } + + @Override + public Tuple2<S, Iterable<T>> call(Pair<K, List<V>> input) throws Exception { + if (!initialized) { + runtimeContext.initialize(fn); + initialized = true; + } + Pair<S, Iterable<T>> out = fn.map(input); + return new Tuple2<S, Iterable<T>>(out.first(), out.second()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java new file mode 100644 index 0000000..a10b7f6 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.fn; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.spark.ByteArray; +import org.apache.crunch.impl.spark.IntByteArray; +import org.apache.crunch.impl.spark.SparkRuntimeContext; +import org.apache.crunch.impl.spark.serde.SerDe; +import org.apache.crunch.types.PGroupedTableType; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; + +import java.io.IOException; + +public class PartitionedMapOutputFunction<K, V> extends PairFunction<Pair<K, V>, IntByteArray, byte[]> { + + private final SerDe<K> keySerde; + private final SerDe<V> valueSerde; + private final PGroupedTableType<K, V> ptype; + private final Class<? extends Partitioner> partitionerClass; + private final int numPartitions; + private final SparkRuntimeContext runtimeContext; + private transient Partitioner partitioner; + + public PartitionedMapOutputFunction( + SerDe<K> keySerde, + SerDe<V> valueSerde, + PGroupedTableType<K, V> ptype, + Class<? extends Partitioner> partitionerClass, + int numPartitions, + SparkRuntimeContext runtimeContext) { + this.keySerde = keySerde; + this.valueSerde = valueSerde; + this.ptype = ptype; + this.partitionerClass = partitionerClass; + this.numPartitions = numPartitions; + this.runtimeContext = runtimeContext; + } + + @Override + public Tuple2<IntByteArray, byte[]> call(Pair<K, V> p) throws Exception { + int partition = getPartitioner().getPartition(p.first(), p.second(), numPartitions); + return new Tuple2<IntByteArray, byte[]>( + new IntByteArray(partition, keySerde.toBytes(p.first())), + valueSerde.toBytes(p.second())); + } + + private Partitioner getPartitioner() { + if (partitioner == null) { + try { + ptype.initialize(runtimeContext.getConfiguration()); + Job job = new Job(runtimeContext.getConfiguration()); + ptype.configureShuffle(job, GroupingOptions.builder().partitionerClass(partitionerClass).build()); + partitioner = ReflectionUtils.newInstance(partitionerClass, job.getConfiguration()); + } catch (IOException e) { + throw new CrunchRuntimeException("Error configuring partitioner", e); + } + } + return partitioner; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java new file mode 100644 index 0000000..35dd7dd --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.fn; + +import com.google.common.collect.Lists; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.impl.spark.ByteArray; +import org.apache.crunch.impl.spark.SparkRuntimeContext; +import org.apache.crunch.types.PGroupedTableType; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import scala.Tuple2; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class ReduceGroupingFunction extends PairFlatMapFunction<Iterator<Tuple2<ByteArray, List<byte[]>>>, + ByteArray, List<byte[]>> { + + private final GroupingOptions options; + private final PGroupedTableType ptype; + private final SparkRuntimeContext ctxt; + private transient RawComparator<?> cmp; + + public ReduceGroupingFunction(GroupingOptions options, + PGroupedTableType ptype, + SparkRuntimeContext ctxt) { + this.options = options; + this.ptype = ptype; + this.ctxt = ctxt; + } + + @Override + public Iterable<Tuple2<ByteArray, List<byte[]>>> call( + final Iterator<Tuple2<ByteArray, List<byte[]>>> iter) throws Exception { + return new Iterable<Tuple2<ByteArray, List<byte[]>>>() { + @Override + public Iterator<Tuple2<ByteArray, List<byte[]>>> iterator() { + return new GroupingIterator(iter, rawComparator()); + } + }; + } + + private RawComparator<?> rawComparator() { + if (cmp == null) { + try { + Job job = new Job(ctxt.getConfiguration()); + ptype.configureShuffle(job, options); + cmp = ReflectionUtils.newInstance(options.getGroupingComparatorClass(), job.getConfiguration()); + } catch (IOException e) { + throw new CrunchRuntimeException("Error configuring grouping comparator", e); + } + } + return cmp; + } + + private static class GroupingIterator implements Iterator<Tuple2<ByteArray, List<byte[]>>> { + + private final Iterator<Tuple2<ByteArray, List<byte[]>>> iter; + private final RawComparator cmp; + private ByteArray key; + private List<byte[]> bytes = Lists.newArrayList(); + + public GroupingIterator(Iterator<Tuple2<ByteArray, List<byte[]>>> iter, RawComparator cmp) { + this.iter = iter; + this.cmp = cmp; + } + + @Override + public boolean hasNext() { + return iter.hasNext() || key != null; + } + + @Override + public Tuple2<ByteArray, List<byte[]>> next() { + ByteArray nextKey = null; + List<byte[]> next = null; + while (iter.hasNext()) { + Tuple2<ByteArray, List<byte[]>> t = iter.next(); + if (key == null) { + key = t._1; + bytes.addAll(t._2); + } else if (cmp.compare(key.value, 0, key.value.length, t._1.value, 0, t._1.value.length) == 0) { + bytes.addAll(t._2); + } else { + nextKey = t._1; + next = Lists.newArrayList(t._2); + break; + } + } + Tuple2<ByteArray, List<byte[]>> ret = new Tuple2<ByteArray, List<byte[]>>(key, bytes); + key = nextKey; + bytes = next; + return ret; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java new file mode 100644 index 0000000..4ebdfaa --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java @@ -0,0 +1,44 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +package org.apache.crunch.impl.spark.fn; + +import com.google.common.collect.Lists; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.spark.ByteArray; +import org.apache.crunch.impl.spark.serde.SerDe; +import org.apache.spark.api.java.function.Function; +import scala.Tuple2; + +import java.util.List; + +public class ReduceInputFunction<K, V> extends Function<Tuple2<ByteArray, List<byte[]>>, Pair<K, List<V>>> { + private final SerDe<K> keySerDe; + private final SerDe<V> valueSerDe; + + public ReduceInputFunction(SerDe<K> keySerDe, SerDe<V> valueSerDe) { + this.keySerDe = keySerDe; + this.valueSerDe = valueSerDe; + } + + @Override + public Pair<K, List<V>> call(Tuple2<ByteArray, List<byte[]>> kv) throws Exception { + return Pair.of(keySerDe.fromBytes(kv._1.value), Lists.transform(kv._2, valueSerDe.fromBytesFunction())); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java new file mode 100644 index 0000000..e6e08a0 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.serde; + +import com.google.common.base.Function; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class AvroSerDe<T> implements SerDe<T> { + + private AvroType<T> avroType; + private transient DatumWriter<T> writer; + private transient DatumReader<T> reader; + + public AvroSerDe(AvroType<T> avroType) { + this.avroType = avroType; + if (avroType.hasReflect() && avroType.hasSpecific()) { + Avros.checkCombiningSpecificAndReflectionSchemas(); + } + } + + private DatumWriter<T> getWriter() { + if (writer == null) { + if (avroType.hasReflect()) { + writer = new ReflectDatumWriter<T>(avroType.getSchema()); + } else if (avroType.hasSpecific()) { + writer = new SpecificDatumWriter<T>(avroType.getSchema()); + } else { + writer = new GenericDatumWriter<T>(avroType.getSchema()); + } + } + return writer; + } + + private DatumReader<T> getReader() { + if (reader == null) { + if (avroType.hasReflect()) { + reader = new ReflectDatumReader<T>(avroType.getSchema()); + } else if (avroType.hasSpecific()) { + reader = new SpecificDatumReader<T>(avroType.getSchema()); + } else { + reader = new GenericDatumReader<T>(avroType.getSchema()); + } + } + return reader; + } + + @Override + public byte[] toBytes(T obj) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); + getWriter().write(obj, encoder); + encoder.flush(); + out.close(); + return out.toByteArray(); + } + + @Override + public T fromBytes(byte[] bytes) { + Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); + try { + return getReader().read(null, decoder); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Function<byte[], T> fromBytesFunction() { + return new Function<byte[], T>() { + @Override + public T apply(@Nullable byte[] input) { + return fromBytes(input); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java new file mode 100644 index 0000000..d374a41 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.serde; + +import com.google.common.base.Function; + +import java.io.Serializable; + +public interface SerDe<T> extends Serializable { + byte[] toBytes(T obj) throws Exception; + + T fromBytes(byte[] bytes); + + Function<byte[], T> fromBytesFunction(); +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java new file mode 100644 index 0000000..d90007d --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark.serde; + +import com.google.common.base.Function; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; + +import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class WritableSerDe implements SerDe<Writable> { + + Class<? extends Writable> clazz; + + public WritableSerDe(Class<? extends Writable> clazz) { + this.clazz = clazz; + } + + @Override + public byte[] toBytes(Writable obj) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + obj.write(dos); + dos.close(); + return baos.toByteArray(); + } + + @Override + public Writable fromBytes(byte[] bytes) { + Writable inst = ReflectionUtils.newInstance(clazz, null); + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream dis = new DataInputStream(bais); + try { + inst.readFields(dis); + } catch (IOException e) { + throw new RuntimeException(e); + } + return inst; + } + + @Override + public Function<byte[], Writable> fromBytesFunction() { + return new Function<byte[], Writable>() { + @Override + public Writable apply(@Nullable byte[] input) { + return fromBytes(input); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7e288d3..4bacc9b 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ under the License. <module>crunch-examples</module> <module>crunch-archetype</module> <module>crunch-scrunch</module> + <module>crunch-spark</module> <module>crunch-dist</module> </modules> @@ -78,13 +79,14 @@ under the License. <jackson.version>1.8.8</jackson.version> <protobuf-java.version>2.4.0a</protobuf-java.version> <libthrift.version>0.8.0</libthrift.version> - <slf4j.version>1.4.3</slf4j.version> + <slf4j.version>1.6.1</slf4j.version> <log4j.version>1.2.15</log4j.version> <junit.version>4.10</junit.version> <hamcrest.version>1.1</hamcrest.version> <mockito.version>1.9.0</mockito.version> - <scala.version>2.9.2</scala.version> - <scalatest.version>1.7.2</scalatest.version> + <scala.version>2.9.3</scala.version> + <scalatest.version>1.9.1</scalatest.version> + <spark.version>0.8.0-incubating</spark.version> <pkg>org.apache.crunch</pkg> </properties> @@ -137,6 +139,12 @@ under the License. <artifactId>crunch-scrunch</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-spark</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.crunch</groupId> @@ -336,6 +344,12 @@ under the License. </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.version}</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> @@ -647,7 +661,7 @@ under the License. <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> - <version>3.1.5</version> + <version>3.1.6</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId>
