cshuo opened a new issue, #14064:
URL: https://github.com/apache/hudi/issues/14064

   ### Bug Description
   
   **What happened:**
   
   Exception throws in flink streaming read query if flink bundle jar is not 
put in `lib` directory of flink.
   
   ```
   ERROR org.apache.flink.table.gateway.service.operation.OperationManager [] - 
Failed to execute the operation 506bf246-a467-47ed-9c52-0d9db336418e.
   org.apache.flink.api.common.functions.InvalidTypesException: Internal error 
occurred.
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:573)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:447)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:438)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.streaming.api.datastream.KeyedStream.<init>(KeyedStream.java:118)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:295) 
~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.hudi.table.HoodieTableSource.addFileDistributionStrategy(HoodieTableSource.java:256)
 ~[?:?]
        at 
org.apache.hudi.table.HoodieTableSource.access$700(HoodieTableSource.java:131) 
~[?:?]
        at 
org.apache.hudi.table.HoodieTableSource$1.produceDataStream(HoodieTableSource.java:230)
 ~[?:?]
        at 
org.apache.hudi.adapter.DataStreamScanProviderAdapter.produceDataStream(DataStreamScanProviderAdapter.java:32)
 ~[?:?]
        at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:163)
 ~[flink-table-planner_2.12-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
 ~[flink-table-planner_2.12-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
 ~[flink-table-planner_2.12-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
 ~[flink-table-planner_2.12-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
 ~[flink-table-planner_2.12-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
 ~[flink-table-planner_2.12-1.20.1.jar:1.20.1]
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
~[flink-scala_2.12-1.20.1.jar:1.20.1]
        at scala.collection.Iterator.foreach(Iterator.scala:937) 
~[flink-scala_2.12-1.20.1.jar:1.20.1]
        at scala.collection.Iterator.foreach$(Iterator.scala:937) 
~[flink-scala_2.12-1.20.1.jar:1.20.1]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) 
~[flink-scala_2.12-1.20.1.jar:1.20.1]
        at scala.collection.IterableLike.foreach(IterableLike.scala:70) 
~[flink-scala_2.12-1.20.1.jar:1.20.1]
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69) 
~[flink-scala_2.12-1.20.1.jar:1.20.1]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-scala_2.12-1.20.1.jar:1.20.1]
        at scala.collection.TraversableLike.map(TraversableLike.scala:233) 
~[flink-scala_2.12-1.20.1.jar:1.20.1]
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226) 
~[flink-scala_2.12-1.20.1.jar:1.20.1]
        at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-scala_2.12-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
 ~[flink-table-planner_2.12-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
 ~[flink-table-planner_2.12-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1308)
 ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:874)
 ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callModifyOperations(OperationExecutor.java:644)
 ~[flink-sql-gateway-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:483)
 ~[flink-sql-gateway-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:243)
 ~[flink-sql-gateway-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:199)
 ~[flink-sql-gateway-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:214)
 ~[flink-sql-gateway-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
 ~[flink-sql-gateway-1.20.1.jar:1.20.1]
        at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
 ~[flink-sql-gateway-1.20.1.jar:1.20.1]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
   Caused by: org.apache.flink.api.java.typeutils.TypeExtractionException: 
Could not extract lambda method out of function: ClassNotFoundException - 
org.apache.hudi.table.format.mor.MergeOnReadInputSplit
        at 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda(TypeExtractionUtils.java:163)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:571)
 ~[flink-dist-1.20.1.jar:1.20.1]
        ... 42 more
   Caused by: java.lang.ClassNotFoundException: 
org.apache.hudi.table.format.mor.MergeOnReadInputSplit
        at 
jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
~[?:?]
        at 
jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
 ~[?:?]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
        at java.lang.Class.forName0(Native Method) ~[?:?]
        at java.lang.Class.forName(Class.java:398) ~[?:?]
        at 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda(TypeExtractionUtils.java:133)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:571)
 ~[flink-dist-1.20.1.jar:1.20.1]
        ... 42 more
   ```
   
   
   ### Environment
   
   **Hudi version:**
   **Query engine:** (Spark/Flink/Trino etc)
   **Relevant configs:**
   
   
   ### Logs and Stack Trace
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to