damccorm opened a new issue, #19713: URL: https://github.com/apache/beam/issues/19713
### Context I have a beam pipeline running on DataFlow using the Java SDK that pulls Proto wrapper messages from a PubSub subscription, I partition these by the OneOf-value and then apply a MapElements to extract the underlying Proto message, so that I end up with a PCollectionList<T extends Message\>. I then do some more processing and try to write them to different sinks. BigQueryIO works absolutely fine. However when I try to use the PubsubIO or ParquetIO, I end up with this error when using FileIO (for Parquet): ``` java.lang.IllegalArgumentException: java.lang.NoSuchMethodException: com.google.protobuf.Message.getDefaultInstance() org.apache.beam.sdk.extensions.protobuf.ProtoCoder.getParser(ProtoCoder.java:288) org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:192) org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:108) org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107) org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators$7.transform(Iterators.java:750) org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner$LateDataFilter.filter(LateDataDroppingDoFnRunner.java:128) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:76) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) ``` and this for PubsubIO: ``` java.lang.IllegalArgumentException: java.lang.NoSuchMethodException: com.google.protobuf.Message.getDefaultInstance() org.apache.beam.sdk.extensions.protobuf.ProtoCoder.getParser(ProtoCoder.java:288) org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:192) org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:108) org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107) org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators$7.transform(Iterators.java:750) org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner$LateDataFilter.filter(LateDataDroppingDoFnRunner.java:128) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:76) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) ``` ### Source Code for Error ProtoCoder.java (lines 278-292: https://github.com/apache/beam/blob/968a80611d424764962e79b726bfa4fd17ced104/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java#L278) ``` /** Get the memoized {@link Parser}, possibly initializing it lazily. */ private Parser<T> getParser() { if (memoizedParser == null) { try { @SuppressWarnings("unchecked") T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null); @SuppressWarnings("unchecked") Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType(); memoizedParser = tParser; } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new IllegalArgumentException(e); } } return memoizedParser; } ``` ### Potential Solution? I am guessing the "hack" they propose on Stackoverflow ([https://stackoverflow.com/questions/44134712/generic-protobuf-deserializer-in-java](https://stackoverflow.com/questions/44134712/generic-protobuf-deserializer-in-java)) could _potentially_ solve the problem? ``` ParameterizedType pType = (ParameterizedType) subclass.getGenericSuperclass(); Class<T> tClass = (Class<T>) pType.getActualTypeArguments()[0]; // In the case where the constructor for `T` takes no arguments. parser = tClass.newInstance().getParserForType(); ``` Thank you for taking the time to read this. I'd be more than happy to contribute to a solution, but I'm not a Proto/Beam or even Java super-user so I would need some assistance. Imported from Jira [BEAM-7938](https://issues.apache.org/jira/browse/BEAM-7938). Original Jira may contain additional context. Reported by: paliendroom. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
