Repository: crunch Updated Branches: refs/heads/master e884bf238 -> f629b8e6b
CRUNCH-563: Add support for BigDecimal aggregators. Contributed by Vasu Doppalapudi. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f629b8e6 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f629b8e6 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f629b8e6 Branch: refs/heads/master Commit: f629b8e6bc1bf9f0fc3f5f6e3c5d28a01c593855 Parents: e884bf2 Author: Josh Wills <[email protected]> Authored: Sun Sep 27 11:45:03 2015 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Sep 27 11:45:03 2015 -0700 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/WordCountIT.java | 41 ++++++++++++- crunch-core/src/it/resources/bigdecimal.txt | 6 ++ .../java/org/apache/crunch/fn/Aggregators.java | 64 ++++++++++++++++++++ .../java/org/apache/crunch/types/PTypes.java | 40 ++++++++++++ .../org/apache/crunch/fn/AggregatorsTest.java | 21 +++++++ 5 files changed, 170 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java index c4e1d58..4c77c41 100644 --- a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.math.BigDecimal; import java.nio.charset.Charset; import java.util.Arrays; import java.util.List; @@ -34,8 +35,10 @@ import org.apache.crunch.lib.Aggregate; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.PTypes; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.writable.WritableTypeFamily; +import org.apache.crunch.types.writable.Writables; import org.junit.Rule; import org.junit.Test; @@ -68,7 +71,6 @@ public class WordCountIT { public static PTable<String, Long> substr(PTable<String, Long> ptable) { return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() { - @Override public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) { if (!input.first().isEmpty()) { @@ -77,9 +79,19 @@ public class WordCountIT { } }, ptable.getPTableType()); } + + public static PTable<String, BigDecimal> convDecimal(PCollection<String> ptable) { + return ptable.parallelDo(new DoFn<String, Pair<String, BigDecimal>>() { + @Override + public void process(String input, Emitter<Pair<String, BigDecimal>> emitter) { + emitter.emit(Pair.of(input.split("~")[0], new BigDecimal(input.split("~")[1]))); + } + }, Writables.tableOf(Writables.strings(), PTypes.bigDecimal(WritableTypeFamily.getInstance()))); + } private boolean runSecond = false; private boolean useToOutput = false; + private boolean testBigDecimal = false; @Test public void testWritables() throws IOException { @@ -98,6 +110,14 @@ public class WordCountIT { useToOutput = true; run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance()); } + + @Test + public void testWritablesForBigDecimal() throws IOException { + runSecond = false; + useToOutput = true; + testBigDecimal = true; + run(new MRPipeline(WordCountIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance()); + } @Test public void testAvro() throws IOException { @@ -149,10 +169,23 @@ public class WordCountIT { PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(Aggregators.SUM_LONGS()); pipeline.writeTextFile(we, substrPath); } + + PTable<String, BigDecimal> bd = null; + if (testBigDecimal) { + String decimalInputPath = tmpDir.copyResourceFileName("bigdecimal.txt"); + PCollection<String> testBd = pipeline.read(At.textFile(decimalInputPath, typeFamily.strings())); + bd = convDecimal(testBd).groupByKey().combineValues(Aggregators.SUM_BIGDECIMALS()); + } + PipelineResult res = pipeline.done(); assertTrue(res.succeeded()); List<PipelineResult.StageResult> stageResults = res.getStageResults(); - if (runSecond) { + if (testBigDecimal) { + assertEquals(1, stageResults.size()); + assertEquals( + ImmutableList.of(Pair.of("A", bigDecimal("3.579")), Pair.of("B", bigDecimal("11.579")), + Pair.of("C", bigDecimal("15.642"))), Lists.newArrayList(bd.materialize())); + } else if (runSecond) { assertEquals(2, stageResults.size()); } else { assertEquals(1, stageResults.size()); @@ -170,4 +203,8 @@ public class WordCountIT { } assertTrue(passed); } + + private static BigDecimal bigDecimal(String value) { + return new BigDecimal(value); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/it/resources/bigdecimal.txt ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/bigdecimal.txt b/crunch-core/src/it/resources/bigdecimal.txt new file mode 100644 index 0000000..f4712d4 --- /dev/null +++ b/crunch-core/src/it/resources/bigdecimal.txt @@ -0,0 +1,6 @@ +A~1.234 +B~2.345 +C~7.321 +A~2.345 +B~9.234 +C~8.321 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java index c5b0c21..cca3ddb 100644 --- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java +++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.fn; +import java.math.BigDecimal; import java.math.BigInteger; import java.util.LinkedList; import java.util.List; @@ -109,6 +110,14 @@ public final class Aggregators { public static Aggregator<BigInteger> SUM_BIGINTS() { return new SumBigInts(); } + + /** + * Sum up all {@link BigDecimal} values. + * @return The newly constructed instance + */ + public static Aggregator<BigDecimal> SUM_BIGDECIMALS() { + return new SumBigDecimals(); + } /** * Return the maximum of all given {@link Comparable} values. @@ -207,6 +216,24 @@ public final class Aggregators { public static Aggregator<BigInteger> MAX_BIGINTS(int n) { return new MaxNAggregator<BigInteger>(n); } + + /** + * Return the maximum of all given {@link BigDecimal} values. + * @return The newly constructed instance + */ + public static Aggregator<BigDecimal> MAX_BIGDECIMALS() { + return new MaxComparables<BigDecimal>(); + } + + /** + * Return the {@code n} largest {@link BigDecimal} values (or fewer if there are fewer + * values than {@code n}). + * @param n The number of values to return + * @return The newly constructed instance + */ + public static Aggregator<BigDecimal> MAX_BIGDECIMALS(int n) { + return new MaxNAggregator<BigDecimal>(n); + } /** * Return the {@code n} largest values (or fewer if there are fewer @@ -327,6 +354,24 @@ public final class Aggregators { public static Aggregator<BigInteger> MIN_BIGINTS(int n) { return new MinNAggregator<BigInteger>(n); } + + /** + * Return the minimum of all given {@link BigDecimal} values. + * @return The newly constructed instance + */ + public static Aggregator<BigDecimal> MIN_BIGDECIMALS() { + return new MinComparables<BigDecimal>(); + } + + /** + * Return the {@code n} smallest {@link BigDecimal} values (or fewer if there are fewer + * values than {@code n}). + * @param n The number of values to return + * @return The newly constructed instance + */ + public static Aggregator<BigDecimal> MIN_BIGDECIMALS(int n) { + return new MinNAggregator<BigDecimal>(n); + } /** * Return the {@code n} smallest values (or fewer if there are fewer @@ -637,6 +682,25 @@ public final class Aggregators { return ImmutableList.of(sum); } } + + private static class SumBigDecimals extends SimpleAggregator<BigDecimal> { + private BigDecimal sum = BigDecimal.ZERO; + + @Override + public void reset() { + sum = BigDecimal.ZERO; + } + + @Override + public void update(BigDecimal next) { + sum = sum.add(next); + } + + @Override + public Iterable<BigDecimal> results() { + return ImmutableList.of(sum); + } + } private static class MaxComparables<C extends Comparable<C>> extends SimpleAggregator<C> { http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java index 82604ac..8715e28 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.types; +import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.UUID; @@ -49,6 +50,13 @@ public class PTypes { public static PType<BigInteger> bigInt(PTypeFamily typeFamily) { return typeFamily.derivedImmutable(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes()); } + + /** + * A PType for Java's {@link BigDecimal} type. + */ + public static PType<BigDecimal> bigDecimal(PTypeFamily typeFamily) { + return typeFamily.derivedImmutable(BigDecimal.class, BYTE_TO_BIGDECIMAL, BIGDECIMAL_TO_BYTE, typeFamily.bytes()); + } /** * A PType for Java's {@link UUID} type. @@ -115,6 +123,20 @@ public class PTypes { } }; + public static final MapFn<ByteBuffer, BigDecimal> BYTE_TO_BIGDECIMAL = new MapFn<ByteBuffer, BigDecimal>() { + @Override + public BigDecimal map(ByteBuffer input) { + return input == null ? null : byteBufferToBigDecimal(input); + } + }; + + public static final MapFn<BigDecimal, ByteBuffer> BIGDECIMAL_TO_BYTE = new MapFn<BigDecimal, ByteBuffer>() { + @Override + public ByteBuffer map(BigDecimal input) { + return input == null ? null : bigDecimalToByteBuffer(input); + } + }; + private static class JacksonInputMapFn<T> extends MapFn<String, T> { private final Class<T> clazz; @@ -298,4 +320,22 @@ public class PTypes { return bb; } }; + + private static BigDecimal byteBufferToBigDecimal(ByteBuffer input) { + int scale = input.getInt(); + byte[] bytes = new byte[input.remaining()]; + input.get(bytes, 0, input.remaining()); + BigInteger bi = new BigInteger(bytes); + BigDecimal bigDecValue = new BigDecimal(bi, scale); + return bigDecValue; + } + + private static ByteBuffer bigDecimalToByteBuffer(BigDecimal input) { + byte[] unScaledBytes = input.unscaledValue().toByteArray(); + byte[] scaleBytes = ByteBuffer.allocate(4).putInt(input.scale()).array(); + byte[] bytes = new byte[scaleBytes.length + unScaledBytes.length]; + System.arraycopy(scaleBytes, 0, bytes, 0, scaleBytes.length); + System.arraycopy(unScaledBytes, 0, bytes, scaleBytes.length, unScaledBytes.length); + return ByteBuffer.wrap(bytes); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/f629b8e6/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java index 57dc8f0..973cbb1 100644 --- a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import java.math.BigDecimal; import java.math.BigInteger; import java.util.Arrays; import java.util.List; @@ -51,6 +52,7 @@ public class AggregatorsTest { assertThat(sapply(SUM_FLOATS(), 1f, 2f, 3f, -4f), is(2f)); assertThat(sapply(SUM_DOUBLES(), 0.1, 0.2, 0.3), is(closeTo(0.6, 0.00001))); assertThat(sapply(SUM_BIGINTS(), bigInt("7"), bigInt("3")), is(bigInt("10"))); + assertThat(sapply(SUM_BIGDECIMALS(), bigDecimal("1.122"), bigDecimal("0.654")), is(bigDecimal("1.776"))); } @Test @@ -61,6 +63,7 @@ public class AggregatorsTest { assertThat(sapply(SUM_FLOATS(), 29f, 17f, 1729f), is(1775.0f)); assertThat(sapply(SUM_DOUBLES(), 29.0, 17.0, 1729.0), is(1775.0)); assertThat(sapply(SUM_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1775"))); + assertThat(sapply(SUM_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("1777.739"))); } @Test @@ -71,6 +74,7 @@ public class AggregatorsTest { assertThat(sapply(MAX_DOUBLES(), 29.0, 17.0, 1729.0), is(1729.0)); assertThat(sapply(MAX_FLOATS(), 29f, 1745f, 17f, 1729f), is(1745.0f)); assertThat(sapply(MAX_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("1729"))); + assertThat(sapply(MAX_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("1729.876"))); assertThat(sapply(Aggregators.<String>MAX_COMPARABLES(), "b", "a", "d", "c"), is("d")); } @@ -82,6 +86,7 @@ public class AggregatorsTest { assertThat(sapply(MIN_DOUBLES(), 29.0, 17.0, 1729.0), is(17.0)); assertThat(sapply(MIN_INTS(), 29, 170, 1729), is(29)); assertThat(sapply(MIN_BIGINTS(), bigInt("29"), bigInt("17"), bigInt("1729")), is(bigInt("17"))); + assertThat(sapply(MIN_BIGDECIMALS(), bigDecimal("29.987"), bigDecimal("17.876"), bigDecimal("1729.876")), is(bigDecimal("17.876"))); assertThat(sapply(Aggregators.<String>MIN_COMPARABLES(), "b", "a", "d", "c"), is("a")); } @@ -153,8 +158,14 @@ public class AggregatorsTest { Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98)); Aggregator<Tuple3<Float, Double, Double>> a = Aggregators.tripAggregator( MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES()); + + List<Tuple3<Float, BigDecimal, BigDecimal>> input1 = ImmutableList.of(Tuple3.of(17.29f, bigDecimal("12.2"), bigDecimal("0.1")), + Tuple3.of(3.0f, bigDecimal("1.2"), bigDecimal("3.14")), Tuple3.of(-1.0f, bigDecimal("14.5"), bigDecimal("-0.98"))); + Aggregator<Tuple3<Float, BigDecimal, BigDecimal>> b = Aggregators.tripAggregator( + MAX_FLOATS(), MAX_BIGDECIMALS(), MIN_BIGDECIMALS()); assertThat(sapply(a, input), is(Tuple3.of(17.29f, 14.5, -0.98))); + assertThat(sapply(b, input1), is(Tuple3.of(17.29f, bigDecimal("14.5"), bigDecimal("-0.98")))); } @Test @@ -163,8 +174,14 @@ public class AggregatorsTest { Tuple4.of(3.0f, 1.2, 3.14, 2), Tuple4.of(-1.0f, 14.5, -0.98, 3)); Aggregator<Tuple4<Float, Double, Double, Integer>> a = Aggregators.quadAggregator( MAX_FLOATS(), MAX_DOUBLES(), MIN_DOUBLES(), SUM_INTS()); + + List<Tuple4<BigDecimal, Double, Double, Integer>> input1 = ImmutableList.of(Tuple4.of(bigDecimal("17.29"), 12.2, 0.1, 1), + Tuple4.of(bigDecimal("3.0"), 1.2, 3.14, 2), Tuple4.of(bigDecimal("-1.0"), 14.5, -0.98, 3)); + Aggregator<Tuple4<BigDecimal, Double, Double, Integer>> b = Aggregators.quadAggregator( + MAX_BIGDECIMALS(), MAX_DOUBLES(), MIN_DOUBLES(), SUM_INTS()); assertThat(sapply(a, input), is(Tuple4.of(17.29f, 14.5, -0.98, 6))); + assertThat(sapply(b, input1), is(Tuple4.of(bigDecimal("17.29"), 14.5, -0.98, 6))); } @Test @@ -233,4 +250,8 @@ public class AggregatorsTest { private static BigInteger bigInt(String value) { return new BigInteger(value); } + + private static BigDecimal bigDecimal(String value) { + return new BigDecimal(value); + } }
