[ 
https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16478991#comment-16478991
 ] 

ASF GitHub Bot commented on FLINK-8861:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5660#discussion_r188938891
  
    --- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
    @@ -233,56 +235,100 @@ public ResultDescriptor executeQuery(SessionContext 
session, String query) throw
                // deployment
                final ClusterClient<?> clusterClient = 
createDeployment(mergedEnv.getDeployment());
     
    -           // initialize result
    -           final DynamicResult result = resultStore.createResult(
    -                   mergedEnv,
    -                   resultSchema,
    -                   context.getExecutionConfig());
    +           if (mergedEnv.getExecution().isStreamingExecution()) {
    +                   // initialize result
    +                   final DynamicResult result = 
resultStore.createDynamicResult(
    +                                   mergedEnv,
    +                                   resultSchema,
    +                                   context.getExecutionConfig());
     
    -           // create job graph with jars
    -           final JobGraph jobGraph;
    -           try {
    -                   jobGraph = createJobGraph(context, 
context.getSessionContext().getName() + ": " + query, table,
    -                           result.getTableSink(),
    -                           clusterClient);
    -           } catch (Throwable t) {
    -                   // the result needs to be closed as long as
    -                   // it not stored in the result store
    -                   result.close();
    -                   throw t;
    -           }
    +                   // create job graph with jars
    +                   final JobGraph jobGraph;
    +                   try {
    +                           jobGraph = createJobGraph(context, 
context.getSessionContext().getName() + ": " + query, table,
    +                                           result.getTableSink(),
    +                                           clusterClient);
    +                   } catch (Throwable t) {
    +                           // the result needs to be closed as long as
    +                           // it not stored in the result store
    +                           result.close();
    +                           throw t;
    +                   }
    +
    +                   // store the result with a unique id (the job id for 
now)
    +                   final String resultId = jobGraph.getJobID().toString();
    +                   resultStore.storeDynamicResult(resultId, result);
    +
    +                   // create execution
    +                   final Runnable program = () -> {
    +                           // we need to submit the job attached for now
    +                           // otherwise it is not possible to retrieve the 
reason why an execution failed
    +                           try {
    +                                   clusterClient.run(jobGraph, 
context.getClassLoader());
    +                           } catch (ProgramInvocationException e) {
    +                                   throw new SqlExecutionException("Could 
not execute table program.", e);
    +                           } finally {
    +                                   try {
    +                                           clusterClient.shutdown();
    +                                   } catch (Exception e) {
    +                                           // ignore
    +                                   }
    +                           }
    +                   };
    +
    +                   // start result retrieval
    +                   result.startRetrieval(program);
    +
    +                   return new ResultDescriptor(resultId, resultSchema, 
result.isMaterialized());
     
    -           // store the result with a unique id (the job id for now)
    -           final String resultId = jobGraph.getJobID().toString();
    -           resultStore.storeResult(resultId, result);
    +           } else if (mergedEnv.getExecution().isBatchExecution()) {
     
    -           // create execution
    -           final Runnable program = () -> {
    -                   // we need to submit the job attached for now
    -                   // otherwise it is not possible to retrieve the reason 
why an execution failed
    +                   if (!mergedEnv.getExecution().isTableMode()) {
    +                           throw new SqlExecutionException("Batch queries 
can only work on table mode");
    +                   }
    +
    +                   BatchResult batchResult = new BatchResult();
    +                   final JobGraph jobGraph = createJobGraph(
    +                                   context,
    +                                   context.getSessionContext().getName () 
+ ": " + query,
    +                                   table,
    +                                   batchResult.getTableSink(),
    +                                   clusterClient);
    +
    +                   final String resultId = jobGraph.getJobID().toString();
    +                   final JobExecutionResult result;
                        try {
    -                           clusterClient.run(jobGraph, 
context.getClassLoader());
    +                           // Fetch the result synchronously.
    +                           result = clusterClient.run(jobGraph, 
context.getClassLoader());
                        } catch (ProgramInvocationException e) {
                                throw new SqlExecutionException("Could not 
execute table program.", e);
    -                   } finally {
    +                   }
    +
    +                   ArrayList<byte[]> accResult = 
result.getAccumulatorResult(batchResult.getAccumulatorName());
    +                   if (accResult != null) {
                                try {
    -                                   clusterClient.shutdown();
    -                           } catch (Exception e) {
    -                                   // ignore
    +                                   List<Row> resultTable = 
SerializedListAccumulator.deserializeList(accResult, 
batchResult.getSerializer());
    --- End diff --
    
    Can we move the deserialization logic into the result class? The executor 
should not need to handle internals of a result.


> Add support for batch queries in SQL Client
> -------------------------------------------
>
>                 Key: FLINK-8861
>                 URL: https://issues.apache.org/jira/browse/FLINK-8861
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Xingcan Cui
>            Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned inĀ 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> Similar to streaming queries, it should be possible to execute batch queries 
> in the SQL Client and collect the results using {{DataSet.collect()}} for 
> debugging purposes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to