damccorm opened a new issue, #19713:
URL: https://github.com/apache/beam/issues/19713

   ### Context
   
   I have a beam pipeline running on DataFlow using the Java SDK that pulls 
Proto wrapper messages from a PubSub subscription, I partition these by the 
OneOf-value and then apply a MapElements to extract the underlying Proto 
message, so that I end up with a PCollectionList<T extends Message\>. I then do 
some more processing and try to write them to different sinks. BigQueryIO works 
absolutely fine. However when I try to use the PubsubIO or ParquetIO, I end up 
with this error when using FileIO (for Parquet): 
   
    
   ```
   
   java.lang.IllegalArgumentException: java.lang.NoSuchMethodException: 
com.google.protobuf.Message.getDefaultInstance()
   
org.apache.beam.sdk.extensions.protobuf.ProtoCoder.getParser(ProtoCoder.java:288)
 org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:192)
   
org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:108) 
org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107)
   
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
   
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
   java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
   
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
   
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
 java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner$LateDataFilter.filter(LateDataDroppingDoFnRunner.java:128)
   
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:76)
   
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
   
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
   
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
   
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
   
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   java.base/java.lang.Thread.run(Thread.java:834)
   
   ```
   
    
   
   and this for PubsubIO:
   
    
   ```
   
   java.lang.IllegalArgumentException: java.lang.NoSuchMethodException: 
com.google.protobuf.Message.getDefaultInstance()
   
org.apache.beam.sdk.extensions.protobuf.ProtoCoder.getParser(ProtoCoder.java:288)
 org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:192)
   
org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:108) 
org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107)
   
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
   
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
   java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
   
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
   
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
 java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner$LateDataFilter.filter(LateDataDroppingDoFnRunner.java:128)
   
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:76)
   
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
   
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
   
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
   
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
   
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
   
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   java.base/java.lang.Thread.run(Thread.java:834)
   
   ```
   
    
   ### Source Code for Error
   
   ProtoCoder.java (lines 278-292: 
https://github.com/apache/beam/blob/968a80611d424764962e79b726bfa4fd17ced104/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java#L278)
   ```
   
   /** Get the memoized {@link Parser}, possibly initializing it lazily. */ 
private Parser<T> getParser()
   { if (memoizedParser == null) { try { @SuppressWarnings("unchecked") T 
protoMessageInstance = (T) 
protoMessageClass.getMethod("getDefaultInstance").invoke(null);
   @SuppressWarnings("unchecked") Parser<T> tParser = (Parser<T>) 
protoMessageInstance.getParserForType();
   memoizedParser = tParser; } catch (IllegalAccessException | 
InvocationTargetException | NoSuchMethodException
   e) { throw new IllegalArgumentException(e); } } return memoizedParser; }
   
   ```
   
   ###  Potential Solution?
   
   I am guessing the "hack" they propose on Stackoverflow 
([https://stackoverflow.com/questions/44134712/generic-protobuf-deserializer-in-java](https://stackoverflow.com/questions/44134712/generic-protobuf-deserializer-in-java))
 could _potentially_ solve the problem?
   ```
   
   ParameterizedType pType = (ParameterizedType) 
subclass.getGenericSuperclass(); Class<T> tClass = (Class<T>)
   pType.getActualTypeArguments()[0]; // In the case where the constructor for 
`T` takes no arguments.
   parser = tClass.newInstance().getParserForType();
   
   ```
   
    
   
   Thank you for taking the time to read this. I'd be more than happy to 
contribute to a solution, but I'm not a Proto/Beam or even Java super-user so I 
would need some assistance. 
   
    
   
    
   
   Imported from Jira 
[BEAM-7938](https://issues.apache.org/jira/browse/BEAM-7938). Original Jira may 
contain additional context.
   Reported by: paliendroom.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to