Victor Kotai created BEAM-3334:
----------------------------------
Summary: NullPointerException in Direct Runner
Key: BEAM-3334
URL: https://issues.apache.org/jira/browse/BEAM-3334
Project: Beam
Issue Type: Bug
Components: runner-direct
Affects Versions: 2.2.0
Reporter: Victor Kotai
Assignee: Thomas Groh
We're currently migrating one of our pipelines from dataflow-sdk-1.9.1 to
beam-2.2.0. To validate the behavior of the the flow, we've got an integration
test (more than one) that reads a file containing TableRows in json
representation, then runs the pipeline transformations on the resulting
PCollection.
Example of the rows:
```
{"id":"firstId","type":"messageType","ts":"1500907784699",
"message":"someMessage"}
{"id":"secondId","type":"messageType","ts":"1500907964217", "message":null}
```
In the test we've set up a pipeline to use the `DirectRunner`, the pipeline
reads a PCollection from a file and does some operations.
The code to read from a file now looks like this:
```
public static PCollection<TableRow> readRowsFromFile(Pipeline pipeline, String
filePath) {
return pipeline.apply(TextIO.read().from(Utils.absolutePathOf(filePath)))
.apply(ParDo.of(new ParseTableRowFromJson()));
}
```
Where `ParseTableRowFromJson` looks is:
```
public class ParseTableRowFromJson extends DoFn<String, TableRow> {
private static Logger LOG =
LoggerFactory.getLogger(ParseTableRowFromJson.class);
@ProcessElement
public void processElement(ProcessContext processContext) {
String input = processContext.element();
try {
ByteArrayInputStream inStream = new
ByteArrayInputStream(input.getBytes("UTF-8"));
processContext.output(TableRowJsonCoder.of().decode(inStream,
OUTER));
} catch (IOException e) {
LOG.warn("Failed parsing tableRow json: {}", input);
}
}
}
```
or with a different implementation, that pretty much does the same thing:
```
private static final ObjectMapper MAPPER = (new ObjectMapper());
@ProcessElement
public void processElement(ProcessContext processContext) {
String input = processContext.element();
try {
processContext.output(MAPPER.readValue(input, TableRow.class));
} catch (IOException e) {
LOG.warn("Failed parsing tableRow json: {}", input);
}
}
```
When we run this on the given input we get the following:
```
Caused by: java.lang.NullPointerException
at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
at java.util.AbstractMap.hashCode(AbstractMap.java:530)
at java.util.Arrays.hashCode(Arrays.java:4146)
at java.util.Objects.hash(Objects.java:128)
at
org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow.hashCode(WindowedValue.java:284)
at java.util.HashMap.hash(HashMap.java:338)
at java.util.HashMap.get(HashMap.java:556)
at
org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:191)
at
org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:130)
at
org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashMultimap.put(HashMultimap.java:48)
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:110)
at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
at
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
at
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
at
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
at
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
at ParseTableRowFromJson.processElement(ParseTableRowFromJson.java:22)
```
The NPE is due to a bad implementation of `hashCode` in
`ArrayMap$Entry.hashCode` and it occurs on the TableRow that contains a `null`
value for `message`.
I've PRed a change into the library that should fix this: [Issue
here|https://github.com/google/google-http-java-client/issues/384]
I would guess now it's a matter of the library releasing and updating the BEAM
dependency to it? Or is there anything else that should be done?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)