[ 
https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16478992#comment-16478992
 ] 

ASF GitHub Bot commented on FLINK-8861:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5660#discussion_r188939606
  
    --- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
    @@ -293,59 +339,75 @@ public ResultDescriptor executeQuery(SessionContext 
session, String query) throw
        }
     
        @Override
    -   public TypedResult<Integer> snapshotResult(SessionContext session, 
String resultId, int pageSize) throws SqlExecutionException {
    -           final DynamicResult result = resultStore.getResult(resultId);
    -           if (result == null) {
    -                   throw new SqlExecutionException("Could not find a 
result with result identifier '" + resultId + "'.");
    -           }
    -           if (!result.isMaterialized()) {
    -                   throw new SqlExecutionException("Invalid result 
retrieval mode.");
    +   public TypedResult<Integer> snapshotResult(SessionContext session, 
String resultId, int
    +                   pageSize) throws SqlExecutionException {
    +           if (!resultStore.isStatic(resultId)) {
    +                   final DynamicResult result = 
resultStore.getDynamicResult(resultId);
    +                   if (result == null) {
    +                           throw new SqlExecutionException("Could not find 
a result with result identifier '" + resultId + "'.");
    +                   }
    +                   if (!result.isMaterialized()) {
    +                           throw new SqlExecutionException("Invalid result 
retrieval mode.");
    +                   }
    +                   return ((MaterializedResult) result).snapshot(pageSize);
    +           } else {
    +                   StaticResult staticResult = 
resultStore.getStaticResult(resultId);
    +                   return staticResult.snapshot(pageSize);
                }
    -           return ((MaterializedResult) result).snapshot(pageSize);
        }
     
        @Override
    -   public List<Row> retrieveResultPage(String resultId, int page) throws 
SqlExecutionException {
    -           final DynamicResult result = resultStore.getResult(resultId);
    -           if (result == null) {
    -                   throw new SqlExecutionException("Could not find a 
result with result identifier '" + resultId + "'.");
    -           }
    -           if (!result.isMaterialized()) {
    -                   throw new SqlExecutionException("Invalid result 
retrieval mode.");
    +   public List<Row> retrieveResultPage(String resultId, int page) throws
    +                   SqlExecutionException {
    +           if (!resultStore.isStatic(resultId)) {
    +                   final DynamicResult result = 
resultStore.getDynamicResult(resultId);
    +                   if (result == null) {
    +                           throw new SqlExecutionException("Could not find 
a result with result identifier '" + resultId + "'.");
    +                   }
    +                   if (!result.isMaterialized()) {
    +                           throw new SqlExecutionException("Invalid result 
retrieval mode.");
    +                   }
    +                   return ((MaterializedResult) result).retrievePage(page);
    +           } else {
    +                   return 
resultStore.getStaticResult(resultId).retrievePage(page);
                }
    -           return ((MaterializedResult) result).retrievePage(page);
        }
     
        @Override
        public void cancelQuery(SessionContext session, String resultId) throws 
SqlExecutionException {
    -           final DynamicResult result = resultStore.getResult(resultId);
    -           if (result == null) {
    -                   throw new SqlExecutionException("Could not find a 
result with result identifier '" + resultId + "'.");
    -           }
    +           if (!resultStore.isStatic(resultId)) {
    --- End diff --
    
    Do we really need the distinction between dynamic and state result here? 
The executor should actually not matter. It should just kill whatever Flink job 
is running.


> Add support for batch queries in SQL Client
> -------------------------------------------
>
>                 Key: FLINK-8861
>                 URL: https://issues.apache.org/jira/browse/FLINK-8861
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Xingcan Cui
>            Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned inĀ 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> Similar to streaming queries, it should be possible to execute batch queries 
> in the SQL Client and collect the results using {{DataSet.collect()}} for 
> debugging purposes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to