Lien Michiels created BEAM-7938:
-----------------------------------

             Summary: ProtoCoder <T extends Message> throws 
NoSuchMethodException: com.google.protobuf.Message.getDefailtInstance()
                 Key: BEAM-7938
                 URL: https://issues.apache.org/jira/browse/BEAM-7938
             Project: Beam
          Issue Type: Bug
          Components: io-java-files, io-java-gcp, io-java-parquet, sdk-java-core
    Affects Versions: 2.14.0
            Reporter: Lien Michiels


h3. Context

I have a beam pipeline running on DataFlow using the Java SDK that pulls Proto 
wrapper messages from a PubSub subscription, I partition these by the 
OneOf-value and then apply a MapElements to extract the underlying Proto 
message, so that I end up with a PCollectionList<T extends Message>. I then do 
some more processing and try to write them to different sinks. BigQueryIO works 
absolutely fine. However when I try to use the PubsubIO or ParquetIO, I end up 
with this error when using FileIO (for Parquet): 

 
{code:java}
java.lang.IllegalArgumentException: java.lang.NoSuchMethodException: 
com.google.protobuf.Message.getDefaultInstance() 
org.apache.beam.sdk.extensions.protobuf.ProtoCoder.getParser(ProtoCoder.java:288)
 org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:192) 
org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:108) 
org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107)
 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
 java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner$LateDataFilter.filter(LateDataDroppingDoFnRunner.java:128)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:76)
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 java.base/java.lang.Thread.run(Thread.java:834)
{code}
 

and this for PubsubIO:

 
{code:java}
java.lang.IllegalArgumentException: java.lang.NoSuchMethodException: 
com.google.protobuf.Message.getDefaultInstance() 
org.apache.beam.sdk.extensions.protobuf.ProtoCoder.getParser(ProtoCoder.java:288)
 org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:192) 
org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:108) 
org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107)
 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
 java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner$LateDataFilter.filter(LateDataDroppingDoFnRunner.java:128)
 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:76)
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 java.base/java.lang.Thread.run(Thread.java:834)
{code}
 
h3. Source Code for Error

ProtoCoder.java (lines 278-292: 
https://github.com/apache/beam/blob/968a80611d424764962e79b726bfa4fd17ced104/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java#L278)
{code:java}
/** Get the memoized {@link Parser}, possibly initializing it lazily. */ 
private Parser<T> getParser() { if (memoizedParser == null) { try { 
@SuppressWarnings("unchecked") T protoMessageInstance = (T) 
protoMessageClass.getMethod("getDefaultInstance").invoke(null); 
@SuppressWarnings("unchecked") Parser<T> tParser = (Parser<T>) 
protoMessageInstance.getParserForType(); memoizedParser = tParser; } catch 
(IllegalAccessException | InvocationTargetException | NoSuchMethodException e) 
{ throw new IllegalArgumentException(e); } } return memoizedParser; }
{code}
h3.  Potential Solution?

I am guessing the "hack" they propose on Stackoverflow 
([https://stackoverflow.com/questions/44134712/generic-protobuf-deserializer-in-java])
 could _potentially_ solve the problem?
{code:java}
ParameterizedType pType = (ParameterizedType) subclass.getGenericSuperclass(); 
Class<T> tClass = (Class<T>) pType.getActualTypeArguments()[0]; // In the case 
where the constructor for `T` takes no arguments. parser = 
tClass.newInstance().getParserForType();
{code}
 

Thank you for taking the time to read this. I'd be more than happy to 
contribute to a solution, but I'm not a Proto/Beam or even Java super-user so I 
would need some assistance. 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to