Updated Branches: refs/heads/master 4a3aa0dfd -> ac317a185
CRUNCH-97: Add support for parsing structured records out of text files to crunch-contrib. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/ac317a18 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/ac317a18 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/ac317a18 Branch: refs/heads/master Commit: ac317a185401cb0ef1afe7096b51ac29106eb1e2 Parents: 4a3aa0d Author: Josh Wills <[email protected]> Authored: Mon Oct 15 20:04:18 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Dec 13 11:18:14 2012 -0800 ---------------------------------------------------------------------- .../contrib/text/AbstractCompositeExtractor.java | 100 +++ .../contrib/text/AbstractSimpleExtractor.java | 97 +++ .../org/apache/crunch/contrib/text/Extractor.java | 67 ++ .../apache/crunch/contrib/text/ExtractorStats.java | 59 ++ .../org/apache/crunch/contrib/text/Extractors.java | 548 +++++++++++++++ .../java/org/apache/crunch/contrib/text/Parse.java | 132 ++++ .../org/apache/crunch/contrib/text/Tokenizer.java | 135 ++++ .../crunch/contrib/text/TokenizerFactory.java | 173 +++++ .../org/apache/crunch/contrib/text/ParseTest.java | 120 ++++ 9 files changed, 1431 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java new file mode 100644 index 0000000..f26fc57 --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractCompositeExtractor.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.text; + +import java.util.List; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Base class for {@code Extractor} instances that delegates the parsing of fields to other + * {@code Extractor} instances, primarily used for constructing composite records that implement + * the {@code Tuple} interface. + */ +public abstract class AbstractCompositeExtractor<T> implements Extractor<T> { + + private final TokenizerFactory tokenizerFactory; + private int errors = 0; + private boolean errorOnLast; + private final List<Extractor<?>> extractors; + + public AbstractCompositeExtractor(TokenizerFactory scannerFactory, List<Extractor<?>> extractors) { + Preconditions.checkArgument(extractors.size() > 0); + this.tokenizerFactory = scannerFactory; + this.extractors = extractors; + } + + @Override + public T extract(String input) { + errorOnLast = false; + Tokenizer tokenizer = tokenizerFactory.create(input); + Object[] values = new Object[extractors.size()]; + try { + for (int i = 0; i < values.length; i++) { + values[i] = extractors.get(i).extract(tokenizer.next()); + if (extractors.get(i).errorOnLastRecord() && !errorOnLast) { + errors++; + errorOnLast = true; + } + } + } catch (Exception e) { + if (!errorOnLast) { + errors++; + errorOnLast = true; + } + return getDefaultValue(); + } + + return doCreate(values); + } + + @Override + public void initialize() { + this.errors = 0; + this.errorOnLast = false; + for (Extractor<?> x : extractors) { + x.initialize(); + } + } + + @Override + public boolean errorOnLastRecord() { + return errorOnLast; + } + + @Override + public ExtractorStats getStats() { + return new ExtractorStats(errors, Lists.transform(extractors, new Function<Extractor<?>, Integer>() { + @Override + public Integer apply(Extractor<?> input) { + return input.getStats().getErrorCount(); + } + })); + } + + /** + * Subclasses should return a new instance of the object based on the fields parsed by + * the {@code Extractor} instances for this composite {@code Extractor} instance. + * + * @param values The values that were extracted by the component {@code Extractor} objects + * @return A new instance of the composite class for this {@code Extractor} + */ + protected abstract T doCreate(Object[] values); +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java new file mode 100644 index 0000000..9959b44 --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/AbstractSimpleExtractor.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.text; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Base class for the common case {@code Extractor} instances that construct a single + * object from a block of text stored in a {@code String}, with support for error handling + * and reporting. + */ +public abstract class AbstractSimpleExtractor<T> implements Extractor<T> { + + private static final Log LOG = LogFactory.getLog(AbstractSimpleExtractor.class); + private static final int LOG_ERROR_LIMIT = 100; + + private int errors; + private boolean errorOnLast; + private final T defaultValue; + private final TokenizerFactory scannerFactory; + + public AbstractSimpleExtractor(T defaultValue) { + this(defaultValue, TokenizerFactory.getDefaultInstance()); + } + + public AbstractSimpleExtractor(T defaultValue, TokenizerFactory scannerFactory) { + this.defaultValue = defaultValue; + this.scannerFactory = scannerFactory; + } + + @Override + public void initialize() { + this.errors = 0; + this.errorOnLast = false; + } + + @Override + public T extract(String input) { + errorOnLast = false; + T res = defaultValue; + try { + res = doExtract(scannerFactory.create(input)); + } catch (Exception e) { + errorOnLast = true; + errors++; + if (errors < LOG_ERROR_LIMIT) { + String msg = String.format("Error occurred parsing input '%s' using extractor %s", + input, this); + LOG.error(msg, e); + } + } + return res; + } + + @Override + public boolean errorOnLastRecord() { + return errorOnLast; + } + + @Override + public T getDefaultValue() { + return defaultValue; + } + + @Override + public ExtractorStats getStats() { + return new ExtractorStats(errors); + } + + /** + * Subclasses must override this method to return a new instance of the + * class that this {@code Extractor} instance is designed to parse. + * <p>Any runtime parsing exceptions from the given {@code Tokenizer} instance + * should be thrown so that they may be caught by the error handling logic + * inside of this class. + * + * @param tokenizer The {@code Tokenizer} instance for the current record + * @return A new instance of the type defined for this class + */ + protected abstract T doExtract(Tokenizer tokenizer); +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java new file mode 100644 index 0000000..1e7f6b7 --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractor.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.text; + +import java.io.Serializable; + +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; + +/** + * An interface for extracting a specific data type from a text string that + * is being processed by a {@code Scanner} object. + * + * @param <T> The data type to be extracted + */ +public interface Extractor<T> extends Serializable { + + /** + * Extract a value with the type of this instance. + */ + T extract(String input); + + /** + * Returns the {@code PType} associated with this data type for the + * given {@code PTypeFamily}. + */ + PType<T> getPType(PTypeFamily ptf); + + /** + * Returns the default value for this {@code Extractor} in case of an + * error. + */ + T getDefaultValue(); + + /** + * Perform any initialization required by this {@code Extractor} during the + * start of a map or reduce task. + */ + void initialize(); + + /** + * Returns true if the last call to {@code extract} on this instance + * threw an exception that was handled. + */ + boolean errorOnLastRecord(); + + /** + * Return statistics about how many errors this {@code Extractor} instance + * encountered while parsing input data. + */ + ExtractorStats getStats(); +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java new file mode 100644 index 0000000..7dc37f4 --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/ExtractorStats.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.text; + +import java.util.List; + +import com.google.common.collect.ImmutableList; + +/** + * Records the number of kind of errors that an {@code Extractor} encountered when parsing + * input data. + */ +public class ExtractorStats { + + private final int errorCount; + private final List<Integer> fieldErrors; + + public ExtractorStats(int errorCount) { + this(errorCount, ImmutableList.<Integer>of()); + } + + public ExtractorStats(int errorCount, List<Integer> fieldErrors) { + this.errorCount = errorCount; + this.fieldErrors = fieldErrors; + } + + /** + * The overall number of records that had some kind of parsing error. + * @return The overall number of records that had some kind of parsing error + */ + public int getErrorCount() { + return errorCount; + } + + /** + * Returns the number of errors that occurred when parsing the individual fields of + * a composite record type, like a {@code Pair} or {@code TupleN}. + * @return The number of errors that occurred when parsing the individual fields of + * a composite record type + */ + public List<Integer> getFieldErrors() { + return fieldErrors; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java new file mode 100644 index 0000000..0ed1282 --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Extractors.java @@ -0,0 +1,548 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.text; + +import java.lang.reflect.Constructor; +import java.util.Collection; +import java.util.Scanner; + +import org.apache.crunch.Pair; +import org.apache.crunch.Tuple; +import org.apache.crunch.Tuple3; +import org.apache.crunch.Tuple4; +import org.apache.crunch.TupleN; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +/** + * Factory methods for constructing common {@code Extractor} types. + */ +public class Extractors { + + /** + * Returns an Extractor for integers. + */ + public static Extractor<Integer> xint() { + return xint(0); + } + + /** + * Returns an Extractor for integers. + */ + public static Extractor<Integer> xint(Integer defaultValue) { + return new IntExtractor(defaultValue); + } + + /** + * Returns an Extractor for longs. + */ + public static Extractor<Long> xlong() { + return xlong(0L); + } + + /** + * Returns an Extractor for longs. + */ + public static Extractor<Long> xlong(Long defaultValue) { + return new LongExtractor(defaultValue); + } + + /** + * Returns an Extractor for floats. + */ + public static Extractor<Float> xfloat() { + return xfloat(0f); + } + + public static Extractor<Float> xfloat(Float defaultValue) { + return new FloatExtractor(defaultValue); + } + + /** + * Returns an Extractor for doubles. + */ + public static Extractor<Double> xdouble() { + return xdouble(0.0); + } + + public static Extractor<Double> xdouble(Double defaultValue) { + return new DoubleExtractor(defaultValue); + } + + /** + * Returns an Extractor for booleans. + */ + public static Extractor<Boolean> xboolean() { + return xboolean(false); + } + + public static Extractor<Boolean> xboolean(Boolean defaultValue) { + return new BooleanExtractor(defaultValue); + } + + /** + * Returns an Extractor for strings. + */ + public static Extractor<String> xstring() { + return xstring(""); + } + + public static Extractor<String> xstring(String defaultValue) { + return new StringExtractor(defaultValue); + } + + public static <T> Extractor<Collection<T>> xcollect(TokenizerFactory scannerFactory, Extractor<T> extractor) { + return new CollectionExtractor<T>(scannerFactory, extractor); + } + + /** + * Returns an Extractor for pairs of the given types that uses the given {@code TokenizerFactory} + * for parsing the sub-fields. + */ + public static <K, V> Extractor<Pair<K, V>> xpair(TokenizerFactory scannerFactory, + Extractor<K> one, Extractor<V> two) { + return new PairExtractor<K, V>(scannerFactory, one, two); + } + + /** + * Returns an Extractor for triples of the given types that uses the given {@code TokenizerFactory} + * for parsing the sub-fields. + */ + public static <A, B, C> Extractor<Tuple3<A, B, C>> xtriple(TokenizerFactory scannerFactory, Extractor<A> a, + Extractor<B> b, Extractor<C> c) { + return new TripExtractor<A, B, C>(scannerFactory, a, b, c); + } + + /** + * Returns an Extractor for quads of the given types that uses the given {@code TokenizerFactory} + * for parsing the sub-fields. + */ + public static <A, B, C, D> Extractor<Tuple4<A, B, C, D>> xquad(TokenizerFactory scannerFactory, Extractor<A> a, + Extractor<B> b, Extractor<C> c, Extractor<D> d) { + return new QuadExtractor<A, B, C, D>(scannerFactory, a, b, c, d); + } + + /** + * Returns an Extractor for an arbitrary number of types that uses the given {@code TokenizerFactory} + * for parsing the sub-fields. + */ + public static Extractor<TupleN> xtupleN(TokenizerFactory scannerFactory, Extractor...extractors) { + return new TupleNExtractor(scannerFactory, extractors); + } + + /** + * Returns an Extractor for a subclass of {@code Tuple} with a constructor that + * has the given extractor types that uses the given {@code TokenizerFactory} + * for parsing the sub-fields. + */ + public static <T extends Tuple> Extractor<T> xcustom(Class<T> clazz, TokenizerFactory scannerFactory, Extractor... extractors) { + return new CustomTupleExtractor<T>(scannerFactory, clazz, extractors); + } + + private static class IntExtractor extends AbstractSimpleExtractor<Integer> { + + public IntExtractor(Integer defaultValue) { + super(defaultValue); + } + + @Override + protected Integer doExtract(Tokenizer tokenizer) { + return tokenizer.nextInt(); + } + + @Override + public PType<Integer> getPType(PTypeFamily ptf) { + return ptf.ints(); + } + + @Override + public String toString() { + return "xint"; + } + } + + private static class LongExtractor extends AbstractSimpleExtractor<Long> { + public LongExtractor(Long defaultValue) { + super(defaultValue); + } + + @Override + protected Long doExtract(Tokenizer tokenizer) { + return tokenizer.nextLong(); + } + + @Override + public PType<Long> getPType(PTypeFamily ptf) { + return ptf.longs(); + } + + @Override + public String toString() { + return "xlong"; + } + }; + + private static class FloatExtractor extends AbstractSimpleExtractor<Float> { + public FloatExtractor(Float defaultValue) { + super(defaultValue); + } + + @Override + protected Float doExtract(Tokenizer tokenizer) { + return tokenizer.nextFloat(); + } + + @Override + public PType<Float> getPType(PTypeFamily ptf) { + return ptf.floats(); + } + + @Override + public String toString() { + return "xfloat"; + } + }; + + private static class DoubleExtractor extends AbstractSimpleExtractor<Double> { + public DoubleExtractor(Double defaultValue) { + super(defaultValue); + } + + @Override + protected Double doExtract(Tokenizer tokenizer) { + return tokenizer.nextDouble(); + } + + @Override + public PType<Double> getPType(PTypeFamily ptf) { + return ptf.doubles(); + } + + @Override + public String toString() { + return "xdouble"; + } + }; + + private static class BooleanExtractor extends AbstractSimpleExtractor<Boolean> { + + public BooleanExtractor(Boolean defaultValue) { + super(defaultValue); + } + + @Override + protected Boolean doExtract(Tokenizer tokenizer) { + return tokenizer.nextBoolean(); + } + + @Override + public PType<Boolean> getPType(PTypeFamily ptf) { + return ptf.booleans(); + } + + @Override + public String toString() { + return "xboolean"; + } + }; + + private static class StringExtractor extends AbstractSimpleExtractor<String> { + + public StringExtractor(String defaultValue) { + super(defaultValue); + } + + @Override + protected String doExtract(Tokenizer tokenizer) { + return tokenizer.next(); + } + + @Override + public PType<String> getPType(PTypeFamily ptf) { + return ptf.strings(); + } + + @Override + public String toString() { + return "xstring"; + } + }; + + private static class CollectionExtractor<T> implements Extractor<Collection<T>> { + + private final TokenizerFactory tokenizerFactory; + private final Extractor<T> extractor; + private int errors = 0; + private boolean errorOnLast; + + public CollectionExtractor(TokenizerFactory tokenizerFactory, Extractor<T> extractor) { + this.tokenizerFactory = tokenizerFactory; + this.extractor = extractor; + } + + @Override + public Collection<T> extract(String input) { + errorOnLast = false; + Tokenizer tokenizer = tokenizerFactory.create(input); + Collection<T> parsed = Lists.newArrayList(); + while (tokenizer.hasNext()) { + parsed.add(extractor.extract(tokenizer.next())); + if (extractor.errorOnLastRecord() && !errorOnLast) { + errorOnLast = true; + errors++; + } + } + return parsed; + } + + @Override + public PType<Collection<T>> getPType(PTypeFamily ptf) { + return ptf.collections(extractor.getPType(ptf)); + } + + @Override + public Collection<T> getDefaultValue() { + return ImmutableList.<T>of(); + } + + @Override + public ExtractorStats getStats() { + return new ExtractorStats(errors, + ImmutableList.of(extractor.getStats().getErrorCount())); + } + + @Override + public void initialize() { + this.errorOnLast = false; + this.errors = 0; + extractor.initialize(); + } + + @Override + public boolean errorOnLastRecord() { + return errorOnLast; + } + + } + + private static class PairExtractor<K, V> extends AbstractCompositeExtractor<Pair<K, V>> { + private final Extractor<K> one; + private final Extractor<V> two; + + public PairExtractor(TokenizerFactory scannerFactory, Extractor<K> one, Extractor<V> two) { + super(scannerFactory, ImmutableList.<Extractor<?>>of(one, two)); + this.one = one; + this.two = two; + } + + @Override + protected Pair<K, V> doCreate(Object[] values) { + return Pair.of((K) values[0], (V) values[1]); + } + + @Override + public PType<Pair<K, V>> getPType(PTypeFamily ptf) { + return ptf.pairs(one.getPType(ptf), two.getPType(ptf)); + } + + @Override + public String toString() { + return "xpair(" + one + "," + two + ")"; + } + + @Override + public Pair<K, V> getDefaultValue() { + return Pair.of(one.getDefaultValue(), two.getDefaultValue()); + } + }; + + private static class TripExtractor<A, B, C> extends AbstractCompositeExtractor<Tuple3<A, B, C>> { + private final Extractor<A> one; + private final Extractor<B> two; + private final Extractor<C> three; + + public TripExtractor(TokenizerFactory sf, Extractor<A> one, Extractor<B> two, Extractor<C> three) { + super(sf, ImmutableList.<Extractor<?>>of(one, two, three)); + this.one = one; + this.two = two; + this.three = three; + } + + @Override + protected Tuple3<A, B, C> doCreate(Object[] values) { + return Tuple3.of((A) values[0], (B) values[1], (C) values[2]); + } + + @Override + public PType<Tuple3<A, B, C>> getPType(PTypeFamily ptf) { + return ptf.triples(one.getPType(ptf), two.getPType(ptf), three.getPType(ptf)); + } + + @Override + public Tuple3<A, B, C> getDefaultValue() { + return Tuple3.of(one.getDefaultValue(), two.getDefaultValue(), three.getDefaultValue()); + } + + @Override + public String toString() { + return "xtriple(" + one + "," + two + "," + three + ")"; + } + }; + + private static class QuadExtractor<A, B, C, D> extends AbstractCompositeExtractor<Tuple4<A, B, C, D>> { + private final Extractor<A> one; + private final Extractor<B> two; + private final Extractor<C> three; + private final Extractor<D> four; + + public QuadExtractor(TokenizerFactory sf, Extractor<A> one, Extractor<B> two, Extractor<C> three, + Extractor<D> four) { + super(sf, ImmutableList.<Extractor<?>>of(one, two, three, four)); + this.one = one; + this.two = two; + this.three = three; + this.four = four; + } + + @Override + protected Tuple4<A, B, C, D> doCreate(Object[] values) { + return Tuple4.of((A) values[0], (B) values[1], (C) values[2], (D) values[3]); + } + + @Override + public PType<Tuple4<A, B, C, D>> getPType(PTypeFamily ptf) { + return ptf.quads(one.getPType(ptf), two.getPType(ptf), three.getPType(ptf), + four.getPType(ptf)); + } + + @Override + public Tuple4<A, B, C, D> getDefaultValue() { + return Tuple4.of(one.getDefaultValue(), two.getDefaultValue(), three.getDefaultValue(), + four.getDefaultValue()); + } + + @Override + public String toString() { + return "xquad(" + one + "," + two + "," + three + "," + four + ")"; + } + }; + + private static class TupleNExtractor extends AbstractCompositeExtractor<TupleN> { + private final Extractor[] extractors; + + public TupleNExtractor(TokenizerFactory scannerFactory, Extractor...extractors) { + super(scannerFactory, ImmutableList.<Extractor<?>>copyOf(extractors)); + this.extractors = extractors; + } + + @Override + protected TupleN doCreate(Object[] values) { + return new TupleN(values); + } + + @Override + public PType<TupleN> getPType(PTypeFamily ptf) { + PType[] ptypes = new PType[extractors.length]; + for (int i = 0; i < ptypes.length; i++) { + ptypes[i] = extractors[i].getPType(ptf); + } + return ptf.tuples(ptypes); + } + + @Override + public TupleN getDefaultValue() { + Object[] values = new Object[extractors.length]; + for (int i = 0; i < values.length; i++) { + values[i] = extractors[i].getDefaultValue(); + } + return doCreate(values); + } + + @Override + public String toString() { + return "xtupleN(" + Joiner.on(',').join(extractors) + ")"; + } + }; + + private static class CustomTupleExtractor<T extends Tuple> extends AbstractCompositeExtractor<T> { + + private final Class<T> clazz; + private final Extractor[] extractors; + + private transient Constructor<T> constructor; + + public CustomTupleExtractor(TokenizerFactory sf, Class<T> clazz, Extractor... extractors) { + super(sf, ImmutableList.<Extractor<?>>copyOf(extractors)); + this.clazz = clazz; + this.extractors = extractors; + } + + @Override + public void initialize() { + super.initialize(); + + Class[] typeArgs = new Class[extractors.length]; + for (int i = 0; i < typeArgs.length; i++) { + typeArgs[i] = extractors[i].getPType( + AvroTypeFamily.getInstance()).getTypeClass(); + } + try { + constructor = clazz.getConstructor(typeArgs); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public T doCreate(Object[] values) { + try { + return constructor.newInstance(values); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public PType<T> getPType(PTypeFamily ptf) { + PType[] ptypes = new PType[extractors.length]; + for (int i = 0; i < ptypes.length; i++) { + ptypes[i] = extractors[i].getPType(ptf); + } + return ptf.tuples(clazz, ptypes); + } + + @Override + public T getDefaultValue() { + Object[] values = new Object[extractors.length]; + for (int i = 0; i < values.length; i++) { + values[i] = extractors[i].getDefaultValue(); + } + return doCreate(values); + } + + @Override + public String toString() { + return "Extractor(" + clazz + ")"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java new file mode 100644 index 0000000..a1c610b --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Parse.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.text; + +import java.util.List; + +import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; + +/** + * Methods for parsing instances of {@code PCollection<String>} into {@code PCollection}'s of strongly-typed + * tuples. + */ +public final class Parse { + + /** + * Parses the lines of the input {@code PCollection<String>} and returns a {@code PCollection<T>} using + * the given {@code Extractor<T>}. + * + * @param groupName A label to use for tracking errors related to the parsing process + * @param input The input {@code PCollection<String>} to convert + * @param extractor The {@code Extractor<T>} that converts each line + * @return A {@code PCollection<T>} + */ + public static <T> PCollection<T> parse(String groupName, PCollection<String> input, + Extractor<T> extractor) { + return parse(groupName, input, input.getTypeFamily(), extractor); + } + + /** + * Parses the lines of the input {@code PCollection<String>} and returns a {@code PCollection<T>} using + * the given {@code Extractor<T>} that uses the given {@code PTypeFamily}. + * + * @param groupName A label to use for tracking errors related to the parsing process + * @param input The input {@code PCollection<String>} to convert + * @param ptf The {@code PTypeFamily} of the returned {@code PCollection<T>} + * @param extractor The {@code Extractor<T>} that converts each line + * @return A {@code PCollection<T>} + */ + public static <T> PCollection<T> parse(String groupName, PCollection<String> input, PTypeFamily ptf, + Extractor<T> extractor) { + return input.parallelDo(groupName, new ExtractorFn<T>(groupName, extractor), extractor.getPType(ptf)); + } + + /** + * Parses the lines of the input {@code PCollection<String>} and returns a {@code PTable<K, V>} using + * the given {@code Extractor<Pair<K, V>>}. + * + * @param groupName A label to use for tracking errors related to the parsing process + * @param input The input {@code PCollection<String>} to convert + * @param extractor The {@code Extractor<Pair<K, V>>} that converts each line + * @return A {@code PTable<K, V>} + */ + public static <K, V> PTable<K, V> parseTable(String groupName, PCollection<String> input, + Extractor<Pair<K, V>> extractor) { + return parseTable(groupName, input, input.getTypeFamily(), extractor); + } + + /** + * Parses the lines of the input {@code PCollection<String>} and returns a {@code PTable<K, V>} using + * the given {@code Extractor<Pair<K, V>>} that uses the given {@code PTypeFamily}. + * + * @param groupName A label to use for tracking errors related to the parsing process + * @param input The input {@code PCollection<String>} to convert + * @param ptf The {@code PTypeFamily} of the returned {@code PTable<K, V>} + * @param extractor The {@code Extractor<Pair<K, V>>} that converts each line + * @return A {@code PTable<K, V>} + */ + public static <K, V> PTable<K, V> parseTable(String groupName, PCollection<String> input, + PTypeFamily ptf, Extractor<Pair<K, V>> extractor) { + List<PType> st = extractor.getPType(ptf).getSubTypes(); + PTableType<K, V> ptt = ptf.tableOf((PType<K>) st.get(0), (PType<V>) st.get(1)); + return input.parallelDo(groupName, new ExtractorFn<Pair<K, V>>(groupName, extractor), ptt); + } + + private static class ExtractorFn<T> extends MapFn<String, T> { + + private final String groupName; + private final Extractor<T> extractor; + + public ExtractorFn(String groupName, Extractor<T> extractor) { + this.groupName = groupName; + this.extractor = extractor; + } + + @Override + public void initialize() { + extractor.initialize(); + } + + @Override + public T map(String input) { + return extractor.extract(input); + } + + @Override + public void cleanup(Emitter<T> emitter) { + if (getContext() != null) { + ExtractorStats stats = extractor.getStats(); + getCounter(groupName, "OVERALL_ERRORS").increment(stats.getErrorCount()); + List<Integer> fieldErrors = stats.getFieldErrors(); + for (int i = 0; i < fieldErrors.size(); i++) { + getCounter(groupName, "ERRORS_FOR_FIELD_" + i).increment(fieldErrors.get(i)); + } + } + } + } + + // Non-instantiable. + private Parse() { } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java new file mode 100644 index 0000000..8de90b6 --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/Tokenizer.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.text; + +import java.util.Scanner; +import java.util.Set; + +/** + * Manages a {@link Scanner} instance and provides support for returning only a subset + * of the fields returned by the underlying {@code Scanner}. + */ +public class Tokenizer { + + private final Scanner scanner; + private final Set<Integer> indices; + private final boolean keep; + private int current; + + /** + * Create a new {@code Tokenizer} instance. + * + * @param scanner The scanner to manage + * @param indices The indices to keep/drop + * @param keep Whether the indices should be kept (true) or dropped (false) + */ + public Tokenizer(Scanner scanner, Set<Integer> indices, boolean keep) { + this.scanner = scanner; + this.indices = checkIndices(indices); + this.keep = keep; + this.current = -1; + } + + private static Set<Integer> checkIndices(Set<Integer> indices) { + for (Integer index : indices) { + if (index < 0) { + throw new IllegalArgumentException("All tokenizer indices must be non-negative"); + } + } + return indices; + } + + private void advance() { + if (indices.isEmpty()) { + return; + } + current++; + while (scanner.hasNext() && + (keep && !indices.contains(current)) || (!keep && indices.contains(current))) { + scanner.next(); + current++; + } + } + + /** + * Returns true if the underlying {@code Scanner} has any tokens remaining. + */ + public boolean hasNext() { + return scanner.hasNext(); + } + + /** + * Advance this {@code Tokenizer} and return the next String from the {@code Scanner}. + * + * @return The next String from the {@code Scanner} + */ + public String next() { + advance(); + return scanner.next(); + } + + /** + * Advance this {@code Tokenizer} and return the next Long from the {@code Scanner}. + * + * @return The next Long from the {@code Scanner} + */ + public Long nextLong() { + advance(); + return scanner.nextLong(); + } + + /** + * Advance this {@code Tokenizer} and return the next Boolean from the {@code Scanner}. + * + * @return The next Boolean from the {@code Scanner} + */ + public Boolean nextBoolean() { + advance(); + return scanner.nextBoolean(); + } + + /** + * Advance this {@code Tokenizer} and return the next Double from the {@code Scanner}. + * + * @return The next Double from the {@code Scanner} + */ + public Double nextDouble() { + advance(); + return scanner.nextDouble(); + } + + /** + * Advance this {@code Tokenizer} and return the next Float from the {@code Scanner}. + * + * @return The next Float from the {@code Scanner} + */ + public Float nextFloat() { + advance(); + return scanner.nextFloat(); + } + + /** + * Advance this {@code Tokenizer} and return the next Integer from the {@code Scanner}. + * + * @return The next Integer from the {@code Scanner} + */ + public Integer nextInt() { + advance(); + return scanner.nextInt(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java new file mode 100644 index 0000000..f43478d --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/text/TokenizerFactory.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.text; + +import java.io.Serializable; +import java.util.Locale; +import java.util.Scanner; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +/** + * Factory class that constructs {@link Tokenizer} instances for input strings that use a fixed + * set of delimiters, skip patterns, locales, and sets of indices to keep or drop. + */ +public class TokenizerFactory implements Serializable { + + private static TokenizerFactory DEFAULT_INSTANCE = new TokenizerFactory(null, null, null, + ImmutableSet.<Integer>of(), true); + + private final String delim; + private final String skip; + private final Locale locale; + private final Set<Integer> indices; + private final boolean keep; + + /** + * Returns a default {@code TokenizerFactory} that uses whitespace as a delimiter and does + * not skip any input fields. + * @return The default {@code TokenizerFactory} + */ + public static TokenizerFactory getDefaultInstance() { return DEFAULT_INSTANCE; } + + private TokenizerFactory(String delim, String skip, Locale locale, + Set<Integer> indices, boolean keep) { + this.delim = delim; + this.skip = skip; + this.locale = locale; + this.indices = indices; + this.keep = keep; + } + + /** + * Return a {@code Scanner} instance that wraps the input string and uses the delimiter, + * skip, and locale settings for this {@code TokenizerFactory} instance. + * + * @param input The input string + * @return A new {@code Scanner} instance with appropriate settings + */ + public Tokenizer create(String input) { + Scanner s = new Scanner(input); + if (delim != null) { + s.useDelimiter(delim); + } + if (skip != null) { + s.skip(skip); + } + if (locale != null) { + s.useLocale(locale); + } + return new Tokenizer(s, indices, keep); + } + + /** + * Factory method for creating a {@code TokenizerFactory.Builder} instance. + * @return A new {@code TokenizerFactory.Builder} + */ + public static Builder builder() { + return new Builder(); + } + + /** + * A class for constructing new {@code TokenizerFactory} instances using the Builder pattern. + */ + public static class Builder { + private String delim; + private String skip; + private Locale locale; + private Set<Integer> indices = ImmutableSet.of(); + private boolean keep; + + /** + * Sets the delimiter used by the {@code TokenizerFactory} instances constructed by + * this instance. + * @param delim The delimiter to use, which may be a regular expression + * @return This {@code Builder} instance + */ + public Builder delimiter(String delim) { + this.delim = delim; + return this; + } + + /** + * Sets the regular expression that determines which input characters should be + * ignored by the {@code Scanner} that is returned by the constructed + * {@code TokenizerFactory}. + * + * @param skip The regular expression of input values to ignore + * @return This {@code Builder} instance + */ + public Builder skip(String skip) { + this.skip = skip; + return this; + } + + /** + * Sets the {@code Locale} to use with the {@code TokenizerFactory} returned by + * this {@code Builder} instance. + * + * @param locale The locale to use + * @return This {@code Builder} instance + */ + public Builder locale(Locale locale) { + this.locale = locale; + return this; + } + + /** + * Keep only the specified fields found by the input scanner, counting from + * zero. + * + * @param indices The indices to keep + * @return This {@code Builder} instance + */ + public Builder keep(Integer... indices) { + Preconditions.checkArgument(this.indices.isEmpty(), + "Cannot set keep indices more than once"); + this.indices = ImmutableSet.copyOf(indices); + this.keep = true; + return this; + } + + /** + * Drop the specified fields found by the input scanner, counting from zero. + * + * @param indices The indices to drop + * @return This {@code Builder} instance + */ + public Builder drop(Integer... indices) { + Preconditions.checkArgument(this.indices.isEmpty(), + "Cannot set drop indices more than once"); + this.indices = ImmutableSet.copyOf(indices); + this.keep = false; + return this; + } + + /** + * Returns a new {@code TokenizerFactory} with settings determined by this + * {@code Builder} instance. + * @return A new {@code TokenizerFactory} + */ + public TokenizerFactory build() { + return new TokenizerFactory(delim, skip, locale, indices, keep); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ac317a18/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java b/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java new file mode 100644 index 0000000..4da7521 --- /dev/null +++ b/crunch-contrib/src/test/java/org/apache/crunch/contrib/text/ParseTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.text; + +import static org.apache.crunch.contrib.text.Extractors.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Collection; + +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.Tuple3; +import org.apache.crunch.Tuple4; +import org.apache.crunch.TupleN; +import org.apache.crunch.contrib.text.Parse; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.types.avro.Avros; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; + +/** + * + */ +public class ParseTest { + + @Test + public void testInt() { + assertEquals(Integer.valueOf(1729), xint().extract("1729")); + assertEquals(Integer.valueOf(321), xint(321).extract("foo")); + } + + @Test + public void testString() { + assertEquals("bar", xstring().extract("bar")); + } + + @Test + public void testPairWithDrop() { + TokenizerFactory sf = TokenizerFactory.builder().delimiter(",").drop(0, 2).build(); + assertEquals(Pair.of(1, "abc"), xpair(sf, xint(), xstring()).extract("foo,1,17.29,abc")); + } + + @Test + public void testTripsWithSkip() { + TokenizerFactory sf = TokenizerFactory.builder().delimiter(";").skip("^foo").build(); + assertEquals(Tuple3.of(17, "abc", 3.4f), + xtriple(sf, xint(), xstring(), xfloat()).extract("foo17;abc;3.4")); + } + + @Test + public void testTripsWithKeep() { + TokenizerFactory sf = TokenizerFactory.builder().delimiter(";").keep(1, 2, 3).build(); + assertEquals(Tuple3.of(17, "abc", 3.4f), + xtriple(sf, xint(), xstring(), xfloat()).extract("foo;17;abc;3.4")); + } + + @Test + public void testQuadsWithWhitespace() { + TokenizerFactory sf = TokenizerFactory.getDefaultInstance(); + assertEquals(Tuple4.of(1.3, "foo", true, 1L), + xquad(sf, xdouble(), xstring(), xboolean(), xlong()).extract("1.3 foo true 1")); + } + + @Test + public void testTupleN() { + TokenizerFactory sf = TokenizerFactory.builder().delimiter(",").build(); + assertEquals(new TupleN(1, false, true, 2, 3), + xtupleN(sf, xint(), xboolean(), xboolean(), xint(), xint()).extract("1,false,true,2,3")); + } + + @Test + public void testCollections() { + TokenizerFactory sf = TokenizerFactory.builder().delimiter(";").build(); + // Use 3000 as the default for values we can't parse + Extractor<Collection<Integer>> x = xcollect(sf, xint(3000)); + + assertEquals(ImmutableList.of(1, 2, 3), x.extract("1;2;3")); + assertFalse(x.errorOnLastRecord()); + assertEquals(ImmutableList.of(17, 29, 3000), x.extract("17;29;a")); + assertTrue(x.errorOnLastRecord()); + assertEquals(1, x.getStats().getErrorCount()); + } + + @Test + public void testNestedComposites() { + TokenizerFactory outer = TokenizerFactory.builder().delimiter(";").build(); + TokenizerFactory inner = TokenizerFactory.builder().delimiter(",").build(); + Extractor<Pair<Pair<Long, Integer>, Tuple3<String, Integer, Float>>> extractor = + xpair(outer, xpair(inner, xlong(), xint()), xtriple(inner, xstring(), xint(), xfloat())); + assertEquals(Pair.of(Pair.of(1L, 2), Tuple3.of("a", 17, 29f)), + extractor.extract("1,2;a,17,29")); + } + + @Test + public void testParse() { + TokenizerFactory sf = TokenizerFactory.builder().delimiter(",").build(); + PCollection<String> lines = MemPipeline.typedCollectionOf(Avros.strings(), "1,3.0"); + Iterable<Pair<Integer, Float>> it = Parse.parse("test", lines, + xpair(sf, xint(), xfloat())).materialize(); + assertEquals(ImmutableList.of(Pair.of(1, 3.0f)), it); + } +}
