http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 52b2f5e..26904aa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -1625,7 +1625,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, Integer>() { @StateId(stateId) - private final StateSpec<Object, ValueState<Integer>> intState = + private final StateSpec<ValueState<Integer>> intState = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -1654,7 +1654,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<Integer, Integer>, Integer>() { @StateId(stateId) - private final StateSpec<Object, ValueState<Integer>> seenSpec = + private final StateSpec<ValueState<Integer>> seenSpec = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -1704,7 +1704,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, MyInteger>() { @StateId(stateId) - private final StateSpec<Object, ValueState<MyInteger>> intState = + private final StateSpec<ValueState<MyInteger>> intState = StateSpecs.value(); @ProcessElement @@ -1734,7 +1734,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, MyInteger>() { @StateId(stateId) - private final StateSpec<Object, ValueState<MyInteger>> intState = + private final StateSpec<ValueState<MyInteger>> intState = StateSpecs.value(); @ProcessElement @@ -1765,7 +1765,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, MyInteger>, MyInteger>() { @StateId(stateId) - private final StateSpec<Object, ValueState<MyInteger>> intState = + private final StateSpec<ValueState<MyInteger>> intState = StateSpecs.value(); @ProcessElement @@ -1797,7 +1797,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, List<MyInteger>>() { @StateId(stateId) - private final StateSpec<Object, ValueState<List<MyInteger>>> intState = + private final StateSpec<ValueState<List<MyInteger>>> intState = StateSpecs.value(); @ProcessElement @@ -1828,7 +1828,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, Integer>() { @StateId(stateId) - private final StateSpec<Object, ValueState<Integer>> intState = + private final StateSpec<ValueState<Integer>> intState = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -1876,7 +1876,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, KV<String, Integer>>() { @StateId(stateId) - private final StateSpec<Object, ValueState<Integer>> intState = + private final StateSpec<ValueState<Integer>> intState = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -1892,7 +1892,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, Integer>() { @StateId(stateId) - private final StateSpec<Object, ValueState<Integer>> intState = + private final StateSpec<ValueState<Integer>> intState = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -1929,7 +1929,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, Integer>() { @StateId(stateId) - private final StateSpec<Object, ValueState<Integer>> intState = + private final StateSpec<ValueState<Integer>> intState = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -1976,7 +1976,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, List<Integer>>() { @StateId(stateId) - private final StateSpec<Object, BagState<Integer>> bufferState = + private final StateSpec<BagState<Integer>> bufferState = StateSpecs.bag(VarIntCoder.of()); @ProcessElement @@ -2013,7 +2013,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, List<MyInteger>>() { @StateId(stateId) - private final StateSpec<Object, BagState<MyInteger>> bufferState = + private final StateSpec<BagState<MyInteger>> bufferState = StateSpecs.bag(); @ProcessElement @@ -2051,7 +2051,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, List<MyInteger>>() { @StateId(stateId) - private final StateSpec<Object, BagState<MyInteger>> bufferState = + private final StateSpec<BagState<MyInteger>> bufferState = StateSpecs.bag(); @ProcessElement @@ -2088,10 +2088,10 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, Set<Integer>>() { @StateId(stateId) - private final StateSpec<Object, SetState<Integer>> setState = + private final StateSpec<SetState<Integer>> setState = StateSpecs.set(VarIntCoder.of()); @StateId(countStateId) - private final StateSpec<Object, CombiningState<Integer, int[], Integer>> + private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @@ -2132,10 +2132,10 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, Set<MyInteger>>() { @StateId(stateId) - private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set(); + private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set(); @StateId(countStateId) - private final StateSpec<Object, CombiningState<Integer, int[], Integer>> + private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @@ -2175,10 +2175,10 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, Set<MyInteger>>() { @StateId(stateId) - private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set(); + private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set(); @StateId(countStateId) - private final StateSpec<Object, CombiningState<Integer, int[], Integer>> + private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @@ -2217,10 +2217,10 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>() { @StateId(stateId) - private final StateSpec<Object, MapState<String, Integer>> mapState = + private final StateSpec<MapState<String, Integer>> mapState = StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of()); @StateId(countStateId) - private final StateSpec<Object, CombiningState<Integer, int[], Integer>> + private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @@ -2264,9 +2264,10 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() { @StateId(stateId) - private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map(); + private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map(); + @StateId(countStateId) - private final StateSpec<Object, CombiningState<Integer, int[], Integer>> + private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @@ -2310,9 +2311,10 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() { @StateId(stateId) - private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map(); + private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map(); + @StateId(countStateId) - private final StateSpec<Object, CombiningState<Integer, int[], Integer>> + private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @@ -2356,16 +2358,13 @@ public class ParDoTest implements Serializable { private static final double EPSILON = 0.0001; @StateId(stateId) - private final StateSpec< - Object, CombiningState<Double, CountSum<Double>, Double>> - combiningState = - StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of()); + private final StateSpec<CombiningState<Double, CountSum<Double>, Double>> combiningState = + StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of()); @ProcessElement public void processElement( ProcessContext c, - @StateId(stateId) - CombiningState<Double, CountSum<Double>, Double> state) { + @StateId(stateId) CombiningState<Double, CountSum<Double>, Double> state) { state.add(c.element().getValue()); Double currentValue = state.read(); if (Math.abs(currentValue - 0.5) < EPSILON) { @@ -2396,40 +2395,38 @@ public class ParDoTest implements Serializable { private static final int EXPECTED_SUM = 16; @StateId(stateId) - private final StateSpec< - Object, CombiningState<Integer, MyInteger, Integer>> - combiningState = - StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() { - @Override - public MyInteger createAccumulator() { - return new MyInteger(0); - } - - @Override - public MyInteger addInput(MyInteger accumulator, Integer input) { - return new MyInteger(accumulator.getValue() + input); - } - - @Override - public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) { - int newValue = 0; - for (MyInteger myInteger : accumulators) { - newValue += myInteger.getValue(); - } - return new MyInteger(newValue); - } + private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState = + StateSpecs.combining( + new Combine.CombineFn<Integer, MyInteger, Integer>() { + @Override + public MyInteger createAccumulator() { + return new MyInteger(0); + } + + @Override + public MyInteger addInput(MyInteger accumulator, Integer input) { + return new MyInteger(accumulator.getValue() + input); + } + + @Override + public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) { + int newValue = 0; + for (MyInteger myInteger : accumulators) { + newValue += myInteger.getValue(); + } + return new MyInteger(newValue); + } - @Override - public Integer extractOutput(MyInteger accumulator) { - return accumulator.getValue(); - } - }); + @Override + public Integer extractOutput(MyInteger accumulator) { + return accumulator.getValue(); + } + }); @ProcessElement public void processElement( ProcessContext c, - @StateId(stateId) - CombiningState<Integer, MyInteger, Integer> state) { + @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state) { state.add(c.element().getValue()); Integer currentValue = state.read(); if (currentValue == EXPECTED_SUM) { @@ -2458,40 +2455,38 @@ public class ParDoTest implements Serializable { private static final int EXPECTED_SUM = 16; @StateId(stateId) - private final StateSpec< - Object, CombiningState<Integer, MyInteger, Integer>> - combiningState = - StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() { - @Override - public MyInteger createAccumulator() { - return new MyInteger(0); - } - - @Override - public MyInteger addInput(MyInteger accumulator, Integer input) { - return new MyInteger(accumulator.getValue() + input); - } - - @Override - public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) { - int newValue = 0; - for (MyInteger myInteger : accumulators) { - newValue += myInteger.getValue(); - } - return new MyInteger(newValue); - } + private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState = + StateSpecs.combining( + new Combine.CombineFn<Integer, MyInteger, Integer>() { + @Override + public MyInteger createAccumulator() { + return new MyInteger(0); + } + + @Override + public MyInteger addInput(MyInteger accumulator, Integer input) { + return new MyInteger(accumulator.getValue() + input); + } + + @Override + public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) { + int newValue = 0; + for (MyInteger myInteger : accumulators) { + newValue += myInteger.getValue(); + } + return new MyInteger(newValue); + } - @Override - public Integer extractOutput(MyInteger accumulator) { - return accumulator.getValue(); - } - }); + @Override + public Integer extractOutput(MyInteger accumulator) { + return accumulator.getValue(); + } + }); @ProcessElement public void processElement( ProcessContext c, - @StateId(stateId) - CombiningState<Integer, MyInteger, Integer> state) { + @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state) { state.add(c.element().getValue()); Integer currentValue = state.read(); if (currentValue == EXPECTED_SUM) { @@ -2523,7 +2518,7 @@ public class ParDoTest implements Serializable { new DoFn<KV<String, Integer>, List<Integer>>() { @StateId(stateId) - private final StateSpec<Object, BagState<Integer>> bufferState = + private final StateSpec<BagState<Integer>> bufferState = StateSpecs.bag(VarIntCoder.of()); @ProcessElement @@ -2697,7 +2692,7 @@ public class ParDoTest implements Serializable { private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); @StateId(stateId) - private final StateSpec<Object, ValueState<String>> stateSpec = + private final StateSpec<ValueState<String>> stateSpec = StateSpecs.value(StringUtf8Coder.of()); @ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 5732438..c16eea2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -188,7 +188,7 @@ public class DoFnInvokersTest { class MockFn extends DoFn<String, String> { @StateId(stateId) - private final StateSpec<Object, ValueState<Integer>> spec = + private final StateSpec<ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of()); @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index e1fa2d1..d6cc4f6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -542,11 +542,11 @@ public class DoFnSignaturesTest { DoFnSignatures.getSignature( new DoFn<KV<String, Integer>, Long>() { @StateId("my-id") - private final StateSpec<Object, ValueState<Integer>> myfield1 = + private final StateSpec<ValueState<Integer>> myfield1 = StateSpecs.value(VarIntCoder.of()); @StateId("my-id") - private final StateSpec<Object, ValueState<Long>> myfield2 = + private final StateSpec<ValueState<Long>> myfield2 = StateSpecs.value(VarLongCoder.of()); @ProcessElement @@ -565,7 +565,7 @@ public class DoFnSignaturesTest { DoFnSignatures.getSignature( new DoFn<KV<String, Integer>, Long>() { @StateId("my-id") - private StateSpec<Object, ValueState<Integer>> myfield = + private StateSpec<ValueState<Integer>> myfield = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -618,7 +618,7 @@ public class DoFnSignaturesTest { DoFnSignatures.getSignature( new DoFn<KV<String, Integer>, Long>() { @StateId("my-id") - private final StateSpec<Object, ValueState<Integer>> myfield = + private final StateSpec<ValueState<Integer>> myfield = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -644,7 +644,7 @@ public class DoFnSignaturesTest { DoFnSignatures.getSignature( new DoFn<KV<String, Integer>, Long>() { @StateId("my-id") - private final StateSpec<Object, ValueState<Integer>> myfield = + private final StateSpec<ValueState<Integer>> myfield = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -668,7 +668,7 @@ public class DoFnSignaturesTest { DoFnSignatures.getSignature( new DoFn<KV<String, Integer>, Long>() { @StateId("my-id") - private final StateSpec<Object, ValueState<Integer>> myfield = + private final StateSpec<ValueState<Integer>> myfield = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -683,7 +683,7 @@ public class DoFnSignaturesTest { DoFnSignatures.getSignature( new DoFn<KV<String, Integer>, Long>() { @StateId("foo") - private final StateSpec<Object, ValueState<Integer>> bizzle = + private final StateSpec<ValueState<Integer>> bizzle = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -728,7 +728,7 @@ public class DoFnSignaturesTest { DoFnSignatures.getSignature( new DoFnUsingState() { @StateId(DoFnUsingState.STATE_ID) - private final StateSpec<Object, ValueState<Integer>> spec = + private final StateSpec<ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of()); }.getClass()); } @@ -770,7 +770,7 @@ public class DoFnSignaturesTest { DoFnSignatures.getSignature( new DoFn<KV<String, Integer>, Long>() { @StateId("foo") - private final StateSpec<Object, ValueState<Integer>> bizzleDecl = + private final StateSpec<ValueState<Integer>> bizzleDecl = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -803,7 +803,7 @@ public class DoFnSignaturesTest { public void testSimpleStateIdNamedDoFn() throws Exception { class DoFnForTestSimpleStateIdNamedDoFn extends DoFn<KV<String, Integer>, Long> { @StateId("foo") - private final StateSpec<Object, ValueState<Integer>> bizzle = + private final StateSpec<ValueState<Integer>> bizzle = StateSpecs.value(VarIntCoder.of()); @ProcessElement @@ -831,7 +831,7 @@ public class DoFnSignaturesTest { // Note that in order to have a coder for T it will require initialization in the constructor, // but that isn't important for this test @StateId("foo") - private final StateSpec<Object, ValueState<T>> bizzle = null; + private final StateSpec<ValueState<T>> bizzle = null; @ProcessElement public void foo(ProcessContext context) {} @@ -866,7 +866,7 @@ public class DoFnSignaturesTest { public static final String STATE_ID = "my-state-id"; @StateId(STATE_ID) - private final StateSpec<Object, ValueState<Integer>> bizzle = + private final StateSpec<ValueState<Integer>> bizzle = StateSpecs.value(VarIntCoder.of()); } @@ -882,7 +882,7 @@ public class DoFnSignaturesTest { public static final String STATE_ID = "my-state-id"; @StateId(STATE_ID) - private final StateSpec<Object, ValueState<String>> myStateSpec = + private final StateSpec<ValueState<String>> myStateSpec = StateSpecs.value(StringUtf8Coder.of()); @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java index 9714d72..9b79d11 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java @@ -59,7 +59,7 @@ public class FakeStepContext implements StepContext { } @Override - public StateInternals<?> stateInternals() { + public StateInternals stateInternals() { throw new UnsupportedOperationException(); }
