This is an automated email from the ASF dual-hosted git repository.

paullin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new fc3a215c1 [KYUUBI #5789] Flink engine 
kyuubi.session.engine.flink.fetch.timeout parameter on the server is not 
effective
fc3a215c1 is described below

commit fc3a215c1639604202a3f3e0571bcbc57fbaf629
Author: junjie.miao <[email protected]>
AuthorDate: Thu Nov 30 18:17:24 2023 +0800

    [KYUUBI #5789] Flink engine kyuubi.session.engine.flink.fetch.timeout 
parameter on the server is not effective
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #5789
    
    ## Describe Your Solution ๐Ÿ”ง
    
    First initialize the server 'kyuubi.session.engine.flink.fetch.timeout' 
parameters, if the server parameter does not exist, it is set to unlimited 
duration; And then obtain the client 
'kyuubi.session.engine.flink.fetch.timeout' parameters, if the client 
parameters use the server does not exist.
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklists
    ## ๐Ÿ“ Author Self Checklist
    
    - [x] My code follows the [style 
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
 of this project
    - [x] I have performed a self-review
    - [x] I have commented my code, particularly in hard-to-understand areas
    - [] I have made corresponding changes to the documentation
    - [x] My changes generate no new warnings
    - [ ] I have added tests that prove my fix is effective or that my feature 
works
    - [x] New and existing unit tests pass locally with my changes
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    ## ๐Ÿ“ Committer Pre-Merge Checklist
    
    - [x] Pull request title is okay.
    - [x] No license issues.
    - [x] Milestone correctly set?
    - [x] Test coverage is ok
    - [x] Assignees are selected.
    - [x] Minimum number of approvals
    - [x] No changes are requested
    
    Closes #5790 from junjiem/fix-5789.
    
    Closes #5789
    
    925ac25f3 [junjie.miao] fix flink engine 
kyuubi.session.engine.flink.fetch.timeout parameter on the server is not 
effective bug
    
    Authored-by: junjie.miao <[email protected]>
    Signed-off-by: Paul Lin <[email protected]>
---
 .../kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala     | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index d5c0629ee..3bb947e07 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -37,6 +37,9 @@ class FlinkSQLOperationManager extends 
OperationManager("FlinkSQLOperationManage
 
   private lazy val resultMaxRowsDefault = getConf.get(ENGINE_FLINK_MAX_ROWS)
 
+  private lazy val resultFetchTimeoutDefault = 
getConf.get(ENGINE_FLINK_FETCH_TIMEOUT)
+    .map(_ milliseconds).getOrElse(Duration.Inf)
+
   private lazy val operationConvertCatalogDatabaseDefault =
     getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED)
 
@@ -71,7 +74,7 @@ class FlinkSQLOperationManager extends 
OperationManager("FlinkSQLOperationManage
 
     val resultFetchTimeout =
       
flinkSession.normalizedConf.get(ENGINE_FLINK_FETCH_TIMEOUT.key).map(_.toLong 
milliseconds)
-        .getOrElse(Duration.Inf)
+        .getOrElse(resultFetchTimeoutDefault)
 
     val op = mode match {
       case NoneMode =>

Reply via email to