ailinzhou opened a new issue, #10576:
URL: https://github.com/apache/hudi/issues/10576

   I am currently experiencing an issue when attempting to read Hudi with 
Flink. The problem arises when I configure the Flink RuntimeMode as 'batch' and 
set the Hudi FlinkOptions.READ_AS_STREAMING to 'false'.
   
   A clear and concise description of the problem.
   
   **To Reproduce**
   
   1. Set Flink RuntimeMode to 'batch'. 
   2. Set Hudi FlinkOptions.READ_AS_STREAMING to 'false'. 
   3. Attempt to read Hudi with Flink.
   
   **Expected behavior**
   
   I expected read Hudi table in batch successfully with Flink under these 
configurations.
   
   ** Actual behavior **
   
   A failure occurs when attempting to read Hudi with Flink under these 
configurations.
   
   
   **Environment Description**
   
   * Hudi version : From 1.10 ~ 1.14
   
   * Flink version: 1.13
   
   **Additional context**
   
   In the `HoodieTableSource` implementation for Flink's `DynamicTableSource`, 
a `ScanRuntimeProvider` is provided. This `ScanRuntimeProvider` implements the 
`produceDataStream` method, which generates a `DataStreamSource`. However, when 
in Bounded mode, it not explicitly specify the `Boundedness` parameter. By 
default, Flink uses `Boundedness.CONTINUOUS_UNBOUNDED` as the default 
parameter, which could potentially be the cause of this issue.
   
   [Code at Hudi HoodieTableSource.java
   
](https://github.com/apache/hudi/blob/4c7ac6112daab349ebcdd1fbb2216d9d1138ca14/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java#L227C1-L228C1)
   
   ``` java
           if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
             ...
           } else {
             ...
             DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
             ...
           }
   
   ```
   Perhaps the code could be modified as follows:
   
   ``` java
           if (!isBounded()) {
             ...
           } else {
             ...
             DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo, Boundedness.BOUNDED);
             ...
           }
   
   ```
   
   **Stacktrace**
   
   ``` java
   
   Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source 
with the 'execution.runtime-mode' set to 'BATCH'. This combination is not 
allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
   
   org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Detected an UNBOUNDED source with the 'execution.runtime-mode' 
set to 'BATCH'. This combination is not allowed, please set the 
'execution.runtime-mode' to STREAMING or AUTOMATIC
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:381)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to