[
https://issues.apache.org/jira/browse/BEAM-3334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Thomas Groh reassigned BEAM-3334:
---------------------------------
Assignee: (was: Thomas Groh)
> NullPointerException in Direct Runner
> -------------------------------------
>
> Key: BEAM-3334
> URL: https://issues.apache.org/jira/browse/BEAM-3334
> Project: Beam
> Issue Type: Bug
> Components: runner-direct
> Affects Versions: 2.2.0
> Reporter: Victor Kotai
> Priority: Major
>
> We're currently migrating one of our pipelines from dataflow-sdk-1.9.1 to
> beam-2.2.0. To validate the behavior of the the flow, we've got an
> integration test (more than one) that reads a file containing TableRows in
> json representation, then runs the pipeline transformations on the resulting
> PCollection.
> Example of the rows:
> ```
> {"id":"firstId","type":"messageType","ts":"1500907784699",
> "message":"someMessage"}
> {"id":"secondId","type":"messageType","ts":"1500907964217", "message":null}
> ```
> In the test we've set up a pipeline to use the `DirectRunner`, the pipeline
> reads a PCollection from a file and does some operations.
> The code to read from a file now looks like this:
> ```
> public static PCollection<TableRow> readRowsFromFile(Pipeline pipeline,
> String filePath) {
> return pipeline.apply(TextIO.read().from(Utils.absolutePathOf(filePath)))
> .apply(ParDo.of(new ParseTableRowFromJson()));
> }
> ```
> Where `ParseTableRowFromJson` looks is:
> ```
> public class ParseTableRowFromJson extends DoFn<String, TableRow> {
> private static Logger LOG =
> LoggerFactory.getLogger(ParseTableRowFromJson.class);
> @ProcessElement
> public void processElement(ProcessContext processContext) {
> String input = processContext.element();
> try {
> ByteArrayInputStream inStream = new
> ByteArrayInputStream(input.getBytes("UTF-8"));
> processContext.output(TableRowJsonCoder.of().decode(inStream,
> OUTER));
> } catch (IOException e) {
> LOG.warn("Failed parsing tableRow json: {}", input);
> }
> }
> }
> ```
> or with a different implementation, that pretty much does the same thing:
> ```
> private static final ObjectMapper MAPPER = (new ObjectMapper());
> @ProcessElement
> public void processElement(ProcessContext processContext) {
> String input = processContext.element();
> try {
> processContext.output(MAPPER.readValue(input, TableRow.class));
> } catch (IOException e) {
> LOG.warn("Failed parsing tableRow json: {}", input);
> }
> }
> ```
> When we run this on the given input we get the following:
> ```
> Caused by: java.lang.NullPointerException
> at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
> at java.util.AbstractMap.hashCode(AbstractMap.java:530)
> at java.util.Arrays.hashCode(Arrays.java:4146)
> at java.util.Objects.hash(Objects.java:128)
> at
> org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow.hashCode(WindowedValue.java:284)
> at java.util.HashMap.hash(HashMap.java:338)
> at java.util.HashMap.get(HashMap.java:556)
> at
> org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:191)
> at
> org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:130)
> at
> org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashMultimap.put(HashMultimap.java:48)
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:110)
> at
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
> at ParseTableRowFromJson.processElement(ParseTableRowFromJson.java:22)
> ```
> The NPE is due to a bad implementation of `hashCode` in
> `ArrayMap$Entry.hashCode` and it occurs on the TableRow that contains a
> `null` value for `message`.
> I've PRed a change into the library that should fix this: [Issue
> here|https://github.com/google/google-http-java-client/issues/384]
> I would guess now it's a matter of the library releasing and updating the
> BEAM dependency to it? Or is there anything else that should be done?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)