[
https://issues.apache.org/jira/browse/BEAM-5409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
haden lee updated BEAM-5409:
----------------------------
Description:
I may be missing something obvious, but for some reason I can't make
{{PAssert}} & {{TestPipeline}} work with {{CoGroupByKey}} -- but without it, it
works fine.
Here is a reference test file that can reproduce the issue I'm facing. I
tested with both beam sdk 2.4 and 2.5.
([For the record this was posted on StackOverflow
before|https://stackoverflow.com/questions/51334429/beam-java-sdk-2-4-2-5-passert-with-cogroupbykey].)
For comparison, {{testWorking}} works as intended, and {{testBroken}} has an
additional step like this:
{code:java}
// code placeholder
// The following four lines causes an issue.
PCollectionTuple tuple =
KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>",
ParDo.of(new String2KV())))
.and(inTag2, pc2).apply("CoGroupByKey",
CoGroupByKey.<String>create()).apply("Some Merge DoFn",
ParDo.of(new MergeDoFn(inTag1, inTag2,
outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
{code}
The error I get can be found after the code below.
Has anyone had a similar issue with test pipeline before? I haven't tested it
yet extensively, but I couldn't find relevant information on {{CoGroupByKey}} &
{{TestPipeline}} together. In production, the same code works fine for my team,
and we wanted to add a few unit tests using {{TestPipeline}} and {{PAssert}}.
That's how we ended up with this issue.
Any help will be appreciated!
{code:java}
// code placeholder
public class ReferenceTest {
@Rule
public final transient TestPipeline pipe1 = TestPipeline.create();
@Rule
public final transient TestPipeline pipe2 = TestPipeline.create();
public static class String2KV extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
// "key1:value1" -> ["key1", "value1"]
String[] tokens = c.element().split(":");
c.output(KV.of(tokens[0], tokens[1]));
}
}
public static class MergeDoFn extends DoFn<KV<String, CoGbkResult>, String> {
final TupleTag<String> inTag1;
final TupleTag<String> inTag2;
final TupleTag<String> outTag2;
public MergeDoFn(TupleTag<String> inTag1, TupleTag<String> inTag2,
TupleTag<String> outTag2) {
this.inTag1 = inTag1;
this.inTag2 = inTag2;
this.outTag2 = outTag2;
}
@ProcessElement
public void processElement(ProcessContext c) {
String val1 = c.element().getValue().getOnly(inTag1);
String val2 = c.element().getValue().getOnly(inTag2);
// outTag1 = main output
// outTag2 = side output
c.output(outTag2, val1 + "," + val2);
}
}
@Test
public void testWorking() {
// Create two PCs for test.
PCollection<String> pc1 =
pipe1.apply("create pc1",
Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
PCollection<KV<String, String>> pc2 =
pipe1.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1",
"key1:value2"))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
// Sanity check.
PAssert.that(pc1).containsInAnyOrder("key1:value1");
PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
pipe1.run();
}
@Test
public void testBroken() {
// Create two PCs for test.
PCollection<String> pc1 =
pipe2.apply("create pc1",
Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
PCollection<KV<String, String>> pc2 =
pipe2.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1",
"key1:value2"))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
// Sanity check.
PAssert.that(pc1).containsInAnyOrder("key1:value1");
PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
TupleTag<String> inTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
// The following four lines causes an issue.
PCollectionTuple tuple =
KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String,
String>", ParDo.of(new String2KV())))
.and(inTag2, pc2).apply("CoGroupByKey",
CoGroupByKey.<String>create()).apply("Some Merge DoFn",
ParDo.of(new MergeDoFn(inTag1, inTag2,
outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
// Without the following two PAsserts, the CoGBK step above seems to cause
an issue.
PAssert.that(tuple.get(outTag1)).empty();
PAssert.that(tuple.get(outTag2)).containsInAnyOrder("value1,value2");
pipe2.run();
}
}
{code}
Here's the error:
{code:java}
// code placeholder
java.lang.IllegalArgumentException: unable to serialize
org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@217aa4f
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:121)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:85)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.registerComponents(CoderTranslation.java:107)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toKnownCoder(CoderTranslation.java:91)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:36)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:138)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:173)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:353)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
at exp.moloco.dataflow2.ReferenceTest.testBroken(ReferenceTest.java:110)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.HashMap.internalWriteEntries(HashMap.java:1789)
at java.util.HashMap.writeObject(HashMap.java:1363)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
... 54 more
Process finished with exit code 255
{code}
was:
I may be missing something obvious, but for some reason I can't make
{{PAssert}} & {{TestPipeline}} work with {{CoGroupByKey}} -- but without it, it
works fine.
Here is a reference test file that can reproduce the issue I'm facing. I tested
with both beam sdk 2.4 and 2.5.
For comparison, {{testWorking}} works as intended, and {{testBroken}} has an
additional step like this:
{code:java}
// code placeholder
// The following four lines causes an issue.
PCollectionTuple tuple =
KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>",
ParDo.of(new String2KV())))
.and(inTag2, pc2).apply("CoGroupByKey",
CoGroupByKey.<String>create()).apply("Some Merge DoFn",
ParDo.of(new MergeDoFn(inTag1, inTag2,
outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
{code}
The error I get can be found after the code below.
Has anyone had a similar issue with test pipeline before? I haven't tested it
yet extensively, but I couldn't find relevant information on {{CoGroupByKey}} &
{{TestPipeline}} together. In production, the same code works fine for my team,
and we wanted to add a few unit tests using {{TestPipeline}} and {{PAssert}}.
That's how we ended up with this issue.
Any help will be appreciated!
{code:java}
// code placeholder
public class ReferenceTest {
@Rule
public final transient TestPipeline pipe1 = TestPipeline.create();
@Rule
public final transient TestPipeline pipe2 = TestPipeline.create();
public static class String2KV extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
// "key1:value1" -> ["key1", "value1"]
String[] tokens = c.element().split(":");
c.output(KV.of(tokens[0], tokens[1]));
}
}
public static class MergeDoFn extends DoFn<KV<String, CoGbkResult>, String> {
final TupleTag<String> inTag1;
final TupleTag<String> inTag2;
final TupleTag<String> outTag2;
public MergeDoFn(TupleTag<String> inTag1, TupleTag<String> inTag2,
TupleTag<String> outTag2) {
this.inTag1 = inTag1;
this.inTag2 = inTag2;
this.outTag2 = outTag2;
}
@ProcessElement
public void processElement(ProcessContext c) {
String val1 = c.element().getValue().getOnly(inTag1);
String val2 = c.element().getValue().getOnly(inTag2);
// outTag1 = main output
// outTag2 = side output
c.output(outTag2, val1 + "," + val2);
}
}
@Test
public void testWorking() {
// Create two PCs for test.
PCollection<String> pc1 =
pipe1.apply("create pc1",
Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
PCollection<KV<String, String>> pc2 =
pipe1.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1",
"key1:value2"))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
// Sanity check.
PAssert.that(pc1).containsInAnyOrder("key1:value1");
PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
pipe1.run();
}
@Test
public void testBroken() {
// Create two PCs for test.
PCollection<String> pc1 =
pipe2.apply("create pc1",
Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
PCollection<KV<String, String>> pc2 =
pipe2.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1",
"key1:value2"))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
// Sanity check.
PAssert.that(pc1).containsInAnyOrder("key1:value1");
PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
TupleTag<String> inTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
// The following four lines causes an issue.
PCollectionTuple tuple =
KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String,
String>", ParDo.of(new String2KV())))
.and(inTag2, pc2).apply("CoGroupByKey",
CoGroupByKey.<String>create()).apply("Some Merge DoFn",
ParDo.of(new MergeDoFn(inTag1, inTag2,
outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
// Without the following two PAsserts, the CoGBK step above seems to cause
an issue.
PAssert.that(tuple.get(outTag1)).empty();
PAssert.that(tuple.get(outTag2)).containsInAnyOrder("value1,value2");
pipe2.run();
}
}
{code}
Here's the error:
{code:java}
// code placeholder
java.lang.IllegalArgumentException: unable to serialize
org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@217aa4f
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:121)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:85)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.registerComponents(CoderTranslation.java:107)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toKnownCoder(CoderTranslation.java:91)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:36)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:138)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:173)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525)
at
org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:353)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
at exp.moloco.dataflow2.ReferenceTest.testBroken(ReferenceTest.java:110)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.HashMap.internalWriteEntries(HashMap.java:1789)
at java.util.HashMap.writeObject(HashMap.java:1363)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
... 54 more
Process finished with exit code 255
{code}
> Beam Java SDK 2.4/2.5 PAssert with CoGroupByKey
> -----------------------------------------------
>
> Key: BEAM-5409
> URL: https://issues.apache.org/jira/browse/BEAM-5409
> Project: Beam
> Issue Type: Bug
> Components: testing
> Affects Versions: 2.4.0, 2.5.0
> Reporter: haden lee
> Assignee: Jason Kuster
> Priority: Major
>
> I may be missing something obvious, but for some reason I can't make
> {{PAssert}} & {{TestPipeline}} work with {{CoGroupByKey}} -- but without it,
> it works fine.
> Here is a reference test file that can reproduce the issue I'm facing. I
> tested with both beam sdk 2.4 and 2.5.
> ([For the record this was posted on StackOverflow
> before|https://stackoverflow.com/questions/51334429/beam-java-sdk-2-4-2-5-passert-with-cogroupbykey].)
> For comparison, {{testWorking}} works as intended, and {{testBroken}} has an
> additional step like this:
> {code:java}
> // code placeholder
> // The following four lines causes an issue.
> PCollectionTuple tuple =
> KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String,
> String>", ParDo.of(new String2KV())))
> .and(inTag2, pc2).apply("CoGroupByKey",
> CoGroupByKey.<String>create()).apply("Some Merge DoFn",
> ParDo.of(new MergeDoFn(inTag1, inTag2,
> outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
> {code}
> The error I get can be found after the code below.
> Has anyone had a similar issue with test pipeline before? I haven't tested it
> yet extensively, but I couldn't find relevant information on {{CoGroupByKey}}
> & {{TestPipeline}} together. In production, the same code works fine for my
> team, and we wanted to add a few unit tests using {{TestPipeline}} and
> {{PAssert}}. That's how we ended up with this issue.
> Any help will be appreciated!
>
>
> {code:java}
> // code placeholder
> public class ReferenceTest {
> @Rule
> public final transient TestPipeline pipe1 = TestPipeline.create();
> @Rule
> public final transient TestPipeline pipe2 = TestPipeline.create();
> public static class String2KV extends DoFn<String, KV<String, String>> {
> @ProcessElement
> public void processElement(ProcessContext c) {
> // "key1:value1" -> ["key1", "value1"]
> String[] tokens = c.element().split(":");
> c.output(KV.of(tokens[0], tokens[1]));
> }
> }
> public static class MergeDoFn extends DoFn<KV<String, CoGbkResult>, String>
> {
> final TupleTag<String> inTag1;
> final TupleTag<String> inTag2;
> final TupleTag<String> outTag2;
> public MergeDoFn(TupleTag<String> inTag1, TupleTag<String> inTag2,
> TupleTag<String> outTag2) {
> this.inTag1 = inTag1;
> this.inTag2 = inTag2;
> this.outTag2 = outTag2;
> }
> @ProcessElement
> public void processElement(ProcessContext c) {
> String val1 = c.element().getValue().getOnly(inTag1);
> String val2 = c.element().getValue().getOnly(inTag2);
> // outTag1 = main output
> // outTag2 = side output
> c.output(outTag2, val1 + "," + val2);
> }
> }
> @Test
> public void testWorking() {
> // Create two PCs for test.
> PCollection<String> pc1 =
> pipe1.apply("create pc1",
> Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
> PCollection<KV<String, String>> pc2 =
> pipe1.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1",
> "key1:value2"))
> .withCoder(KvCoder.of(StringUtf8Coder.of(),
> StringUtf8Coder.of())));
> // Sanity check.
> PAssert.that(pc1).containsInAnyOrder("key1:value1");
> PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
> pipe1.run();
> }
> @Test
> public void testBroken() {
> // Create two PCs for test.
> PCollection<String> pc1 =
> pipe2.apply("create pc1",
> Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
> PCollection<KV<String, String>> pc2 =
> pipe2.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1",
> "key1:value2"))
> .withCoder(KvCoder.of(StringUtf8Coder.of(),
> StringUtf8Coder.of())));
> // Sanity check.
> PAssert.that(pc1).containsInAnyOrder("key1:value1");
> PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
> TupleTag<String> inTag1 = new TupleTag<String>() {
> private static final long serialVersionUID = 1L;
> };
> TupleTag<String> inTag2 = new TupleTag<String>() {
> private static final long serialVersionUID = 1L;
> };
> TupleTag<String> outTag1 = new TupleTag<String>() {
> private static final long serialVersionUID = 1L;
> };
> TupleTag<String> outTag2 = new TupleTag<String>() {
> private static final long serialVersionUID = 1L;
> };
> // The following four lines causes an issue.
> PCollectionTuple tuple =
> KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String,
> String>", ParDo.of(new String2KV())))
> .and(inTag2, pc2).apply("CoGroupByKey",
> CoGroupByKey.<String>create()).apply("Some Merge DoFn",
> ParDo.of(new MergeDoFn(inTag1, inTag2,
> outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
> // Without the following two PAsserts, the CoGBK step above seems to
> cause an issue.
> PAssert.that(tuple.get(outTag1)).empty();
> PAssert.that(tuple.get(outTag2)).containsInAnyOrder("value1,value2");
> pipe2.run();
> }
> }
> {code}
> Here's the error:
>
> {code:java}
> // code placeholder
> java.lang.IllegalArgumentException: unable to serialize
> org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@217aa4f
> at
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:121)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:85)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.registerComponents(CoderTranslation.java:107)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toKnownCoder(CoderTranslation.java:91)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:36)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:138)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:173)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
> at
> org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
> at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:353)
> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
> at exp.moloco.dataflow2.ReferenceTest.testBroken(ReferenceTest.java:110)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
> at
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.io.NotSerializableException:
> exp.moloco.dataflow2.ReferenceTest
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1789)
> at java.util.HashMap.writeObject(HashMap.java:1363)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
> ... 54 more
> Process finished with exit code 255
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)