http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java new file mode 100644 index 0000000..825b445 --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.bloomfilter; + +import java.io.IOException; +import java.util.Map; + +import org.apache.crunch.CombineFn.Aggregator; +import org.apache.crunch.CombineFn.AggregatorCombineFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PObject; +import org.apache.crunch.PTable; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.materialize.pobject.FirstElementPObject; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.bloom.BloomFilter; + +import com.google.common.collect.ImmutableList; + +/** + * Factory Class for creating BloomFilters. The APIs require a + * {@link BloomFilterFn} which is responsible for generating keys of the filter. + */ +public class BloomFilterFactory { + /** + * The method will take an input path and generates BloomFilters for all text + * files in that path. The method return back a {@link PObject} containing a + * {@link Map} having file names as keys and filters as values + */ + public static PObject<Map<String, BloomFilter>> createFilter(Path inputPath, BloomFilterFn<String> filterFn) + throws IOException { + MRPipeline pipeline = new MRPipeline(BloomFilterFactory.class); + FileStatus[] listStatus = FileSystem.get(pipeline.getConfiguration()).listStatus(inputPath); + PTable<String, BloomFilter> filterTable = null; + for (FileStatus fileStatus : listStatus) { + Path path = fileStatus.getPath(); + PCollection<String> readTextFile = pipeline.readTextFile(path.toString()); + pipeline.getConfiguration().set(BloomFilterFn.CRUNCH_FILTER_NAME, path.getName()); + PTable<String, BloomFilter> currentTable = createFilterTable(readTextFile, filterFn); + if (filterTable != null) { + filterTable = filterTable.union(currentTable); + } else { + filterTable = currentTable; + } + } + return filterTable.asMap(); + } + + public static <T> PObject<BloomFilter> createFilter(PCollection<T> collection, BloomFilterFn<T> filterFn) { + collection.getPipeline().getConfiguration().set(BloomFilterFn.CRUNCH_FILTER_NAME, collection.getName()); + return new FirstElementPObject<BloomFilter>(createFilterTable(collection, filterFn).values()); + } + + private static <T> PTable<String, BloomFilter> createFilterTable(PCollection<T> collection, BloomFilterFn<T> filterFn) { + PTypeFamily tf = collection.getTypeFamily(); + PTable<String, BloomFilter> table = collection.parallelDo(filterFn, + tf.tableOf(tf.strings(), Writables.writables(BloomFilter.class))); + return table.groupByKey(1).combineValues(new AggregatorCombineFn<String, BloomFilter>(new BloomFilterAggregator())); + } + +} + +@SuppressWarnings("serial") +class BloomFilterAggregator implements Aggregator<BloomFilter> { + private transient BloomFilter bloomFilter = null; + private transient int filterSize; + + @Override + public void update(BloomFilter value) { + bloomFilter.or(value); + } + + @Override + public Iterable<BloomFilter> results() { + return ImmutableList.of(bloomFilter); + } + + @Override + public void initialize(Configuration configuration) { + filterSize = BloomFilterFn.getBloomFilterSize(configuration); + } + + @Override + public void reset() { + bloomFilter = BloomFilterFn.initializeFilter(filterSize); + + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java new file mode 100644 index 0000000..7d27b33 --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFn.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.bloomfilter; + +import java.util.Collection; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.bloom.Key; +import org.apache.hadoop.util.hash.Hash; + +/** + * The class is responsible for generating keys that are used in a BloomFilter + */ +@SuppressWarnings("serial") +public abstract class BloomFilterFn<S> extends DoFn<S, Pair<String, BloomFilter>> { + public static final String CRUNCH_FILTER_SIZE = "crunch.filter.size"; + public static final String CRUNCH_FILTER_NAME = "crunch.filter.name"; + private transient BloomFilter bloomFilter = null; + + @Override + public void initialize() { + super.initialize(); + bloomFilter = initializeFilter(getBloomFilterSize(getConfiguration())); + } + + @Override + public void process(S input, Emitter<Pair<String, BloomFilter>> emitter) { + Collection<Key> keys = generateKeys(input); + if (CollectionUtils.isNotEmpty(keys)) + bloomFilter.add(keys); + } + + public abstract Collection<Key> generateKeys(S input); + + @Override + public void cleanup(Emitter<Pair<String, BloomFilter>> emitter) { + String filterName = getConfiguration().get(CRUNCH_FILTER_NAME); + emitter.emit(Pair.of(filterName, bloomFilter)); + } + + static BloomFilter initializeFilter(int size) { + return new BloomFilter(size, 5, Hash.MURMUR_HASH); + } + + static int getBloomFilterSize(Configuration configuration) { + return configuration.getInt(CRUNCH_FILTER_SIZE, 1024); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java new file mode 100644 index 0000000..8ce703e --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * BloomFilters are space and time efficient and Hadoop has support for creating + * them.This package provides support for creating BloomFilters in crunch. + */ +package org.apache.crunch.contrib.bloomfilter; + http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java new file mode 100644 index 0000000..7f5eee7 --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/package-info.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The package contains some interesting contributions from the users of + * crunch.These interesting things did not have a place in the core library of crunch + * but they are quite useful for the users of crunch. + */ +package org.apache.crunch.contrib; + http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/crunch-dist/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-dist/pom.xml b/crunch-dist/pom.xml index d7ff63e..b770685 100644 --- a/crunch-dist/pom.xml +++ b/crunch-dist/pom.xml @@ -57,6 +57,10 @@ under the License. <groupId>org.apache.crunch</groupId> <artifactId>crunch-scrunch</artifactId> </dependency> + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-contrib</artifactId> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ad90b151/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b2a7ba2..3ec2af0 100644 --- a/pom.xml +++ b/pom.xml @@ -47,6 +47,7 @@ under the License. <module>crunch</module> <module>crunch-hbase</module> <module>crunch-test</module> + <module>crunch-contrib</module> <module>crunch-examples</module> <module>crunch-archetype</module> <module>crunch-scrunch</module> @@ -156,6 +157,12 @@ under the License. <artifactId>crunch-scrunch</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-contrib</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>com.google.guava</groupId>
