Hi Timo,

Agree with you that streaming queries is our top priority,
but I think there are too many things need to discuss for multiline
statements:
e.g.
1. what's the behaivor of DDL and DML mixing for async execution:
create table t1 xxx;
create table t2 xxx;
insert into t2 select * from t1 where xxx;
drop table t1; // t1 may be a MySQL table, the data will also be deleted.

t1 is dropped when "insert" job is running.

2. what's the behaivor of unified scenario for async execution: (as you
mentioned)
INSERT INTO t1 SELECT * FROM s;
INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;

The result of the second statement is indeterministic, because the first
statement maybe is running.
I think we need to put a lot of effort to define the behavior of logically
related queries.

In this FLIP, I suggest we only handle single statement, and we also
introduce an async execute method
which is more important and more often used for users.

Dor the sync methods (like `TableEnvironment.executeSql` and
`StatementSet.execute`),
the result will be returned until the job is finished. The following
methods will be introduced in this FLIP:

 /**
  * Asynchronously execute the given single statement
  */
TableEnvironment.executeSqlAsync(String statement): TableResult

/**
 * Asynchronously execute the dml statements as a batch
 */
StatementSet.executeAsync(): TableResult

public interface TableResult {
   /**
    * return JobClient for DQL and DML in async mode, else return
Optional.empty
    */
   Optional<JobClient> getJobClient();
}

what do you think?

Best,
Godfrey

Timo Walther <twal...@apache.org> 于2020年3月26日周四 下午9:15写道:

> Hi Godfrey,
>
> executing streaming queries must be our top priority because this is
> what distinguishes Flink from competitors. If we change the execution
> behavior, we should think about the other cases as well to not break the
> API a third time.
>
> I fear that just having an async execute method will not be enough
> because users should be able to mix streaming and batch queries in a
> unified scenario.
>
> If I remember it correctly, we had some discussions in the past about
> what decides about the execution mode of a query. Currently, we would
> like to let the query decide, not derive it from the sources.
>
> So I could image a multiline pipeline as:
>
> USE CATALOG 'mycat';
> INSERT INTO t1 SELECT * FROM s;
> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>
> For executeMultilineSql():
>
> sync because regular SQL
> sync because regular Batch SQL
> async because Streaming SQL
>
> For executeAsyncMultilineSql():
>
> async because everything should be async
> async because everything should be async
> async because everything should be async
>
> What we should not start for executeAsyncMultilineSql():
>
> sync because DDL
> async because everything should be async
> async because everything should be async
>
> What are you thoughts here?
>
> Regards,
> Timo
>
>
> On 26.03.20 12:50, godfrey he wrote:
> > Hi Timo,
> >
> > I agree with you that streaming queries mostly need async execution.
> > In fact, our original plan is only introducing sync methods in this FLIP,
> > and async methods (like "executeSqlAsync") will be introduced in the
> future
> > which is mentioned in the appendix.
> >
> > Maybe the async methods also need to be considered in this FLIP.
> >
> > I think sync methods is also useful for streaming which can be used to
> run
> > bounded source.
> > Maybe we should check whether all sources are bounded in sync execution
> > mode.
> >
> >> Also, if we block for streaming queries, we could never support
> >> multiline files. Because the first INSERT INTO would block the further
> >> execution.
> > agree with you, we need async method to submit multiline files,
> > and files should be limited that the DQL and DML should be always in the
> > end for streaming.
> >
> > Best,
> > Godfrey
> >
> > Timo Walther <twal...@apache.org> 于2020年3月26日周四 下午4:29写道:
> >
> >> Hi Godfrey,
> >>
> >> having control over the job after submission is a requirement that was
> >> requested frequently (some examples are [1], [2]). Users would like to
> >> get insights about the running or completed job. Including the jobId,
> >> jobGraph etc., the JobClient summarizes these properties.
> >>
> >> It is good to have a discussion about synchronous/asynchronous
> >> submission now to have a complete execution picture.
> >>
> >> I thought we submit streaming queries mostly async and just wait for the
> >> successful submission. If we block for streaming queries, how can we
> >> collect() or print() results?
> >>
> >> Also, if we block for streaming queries, we could never support
> >> multiline files. Because the first INSERT INTO would block the further
> >> execution.
> >>
> >> If we decide to block entirely on streaming queries, we need the async
> >> execution methods in the design already. However, I would rather go for
> >> non-blocking streaming queries. Also with the `EMIT STREAM` key word in
> >> mind that we might add to SQL statements soon.
> >>
> >> Regards,
> >> Timo
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-16761
> >> [2] https://issues.apache.org/jira/browse/FLINK-12214
> >>
> >> On 25.03.20 16:30, godfrey he wrote:
> >>> Hi Timo,
> >>>
> >>> Thanks for the updating.
> >>>
> >>> Regarding to "multiline statement support", I'm also fine that
> >>> `TableEnvironment.executeSql()` only supports single line statement,
> and
> >> we
> >>> can support multiline statement later (needs more discussion about
> this).
> >>>
> >>> Regarding to "StatementSet.explian()", I don't have strong opinions
> about
> >>> that.
> >>>
> >>> Regarding to "TableResult.getJobClient()", I think it's unnecessary.
> The
> >>> reason is: first, many statements (e.g. DDL, show xx, use xx)  will not
> >>> submit a Flink job. second, `TableEnvironment.executeSql()` and
> >>> `StatementSet.execute()` are synchronous method, `TableResult` will be
> >>> returned only after the job is finished or failed.
> >>>
> >>> Regarding to "whether StatementSet.execute() needs to throw
> exception", I
> >>> think we should choose a unified way to tell whether the execution is
> >>> successful. If `TableResult` contains ERROR kind (non-runtime
> exception),
> >>> users need to not only check the result but also catch the runtime
> >>> exception in their code. or `StatementSet.execute()` does not throw any
> >>> exception (including runtime exception), all exception messages are in
> >> the
> >>> result.  I prefer "StatementSet.execute() needs to throw exception". cc
> >> @Jark
> >>> Wu <imj...@gmail.com>
> >>>
> >>> I will update the agreed parts to the document first.
> >>>
> >>> Best,
> >>> Godfrey
> >>>
> >>>
> >>> Timo Walther <twal...@apache.org> 于2020年3月25日周三 下午6:51写道:
> >>>
> >>>> Hi Godfrey,
> >>>>
> >>>> thanks for starting the discussion on the mailing list. And sorry
> again
> >>>> for the late reply to FLIP-84. I have updated the Google doc one more
> >>>> time to incorporate the offline discussions.
> >>>>
> >>>>    From Dawid's and my view, it is fine to postpone the multiline
> support
> >>>> to a separate method. This can be future work even though we will need
> >>>> it rather soon.
> >>>>
> >>>> If there are no objections, I suggest to update the FLIP-84 again and
> >>>> have another voting process.
> >>>>
> >>>> Thanks,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 25.03.20 11:17, godfrey he wrote:
> >>>>> Hi community,
> >>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
> >>>> feedbacks
> >>>>> are all about new introduced methods. We had a discussion yesterday,
> >> and
> >>>>> most of feedbacks have been agreed upon. Here is the conclusions:
> >>>>>
> >>>>> *1. about proposed methods in `TableEnvironment`:*
> >>>>>
> >>>>> the original proposed methods:
> >>>>>
> >>>>> TableEnvironment.createDmlBatch(): DmlBatch
> >>>>> TableEnvironment.executeStatement(String statement): ResultTable
> >>>>>
> >>>>> the new proposed methods:
> >>>>>
> >>>>> // we should not use abbreviations in the API, and the term "Batch"
> is
> >>>>> easily confused with batch/streaming processing
> >>>>> TableEnvironment.createStatementSet(): StatementSet
> >>>>>
> >>>>> // every method that takes SQL should have `Sql` in its name
> >>>>> // supports multiline statement ???
> >>>>> TableEnvironment.executeSql(String statement): TableResult
> >>>>>
> >>>>> // new methods. supports explaining DQL and DML
> >>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
> >> details):
> >>>>> String
> >>>>>
> >>>>>
> >>>>> *2. about proposed related classes:*
> >>>>>
> >>>>> the original proposed classes:
> >>>>>
> >>>>> interface DmlBatch {
> >>>>>        void addInsert(String insert);
> >>>>>        void addInsert(String targetPath, Table table);
> >>>>>        ResultTable execute() throws Exception ;
> >>>>>        String explain(boolean extended);
> >>>>> }
> >>>>>
> >>>>> public interface ResultTable {
> >>>>>        TableSchema getResultSchema();
> >>>>>        Iterable<Row> getResultRows();
> >>>>> }
> >>>>>
> >>>>> the new proposed classes:
> >>>>>
> >>>>> interface StatementSet {
> >>>>>        // every method that takes SQL should have `Sql` in its name
> >>>>>        // return StatementSet instance for fluent programming
> >>>>>        addInsertSql(String statement): StatementSet
> >>>>>
> >>>>>        // return StatementSet instance for fluent programming
> >>>>>        addInsert(String tablePath, Table table): StatementSet
> >>>>>
> >>>>>        // new method. support overwrite mode
> >>>>>        addInsert(String tablePath, Table table, boolean overwrite):
> >>>>> StatementSet
> >>>>>
> >>>>>        explain(): String
> >>>>>
> >>>>>        // new method. supports adding more details for the result
> >>>>>        explain(ExplainDetail... extraDetails): String
> >>>>>
> >>>>>        // throw exception ???
> >>>>>        execute(): TableResult
> >>>>> }
> >>>>>
> >>>>> interface TableResult {
> >>>>>        getTableSchema(): TableSchema
> >>>>>
> >>>>>        // avoid custom parsing of an "OK" row in programming
> >>>>>        getResultKind(): ResultKind
> >>>>>
> >>>>>        // instead of `get` make it explicit that this is might be
> >>>> triggering
> >>>>> an expensive operation
> >>>>>        collect(): Iterable<Row>
> >>>>>
> >>>>>        // for fluent programming
> >>>>>        print(): Unit
> >>>>> }
> >>>>>
> >>>>> enum ResultKind {
> >>>>>        SUCCESS, // for DDL, DCL and statements with a simple "OK"
> >>>>>        SUCCESS_WITH_CONTENT, // rows with important content are
> >> available
> >>>>> (DML, DQL)
> >>>>> }
> >>>>>
> >>>>>
> >>>>> *3. new proposed methods in `Table`*
> >>>>>
> >>>>> `Table.insertInto()` will be deprecated, and the following methods
> are
> >>>>> introduced:
> >>>>>
> >>>>> Table.executeInsert(String tablePath): TableResult
> >>>>> Table.executeInsert(String tablePath, boolean overwrite): TableResult
> >>>>> Table.explain(ExplainDetail... details): String
> >>>>> Table.execute(): TableResult
> >>>>>
> >>>>> There are two issues need further discussion, one is whether
> >>>>> `TableEnvironment.executeSql(String statement): TableResult` needs to
> >>>>> support multiline statement (or whether `TableEnvironment` needs to
> >>>> support
> >>>>> multiline statement), and another one is whether
> >> `StatementSet.execute()`
> >>>>> needs to throw exception.
> >>>>>
> >>>>> please refer to the feedback document [2] for the details.
> >>>>>
> >>>>> Any suggestions are warmly welcomed!
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>
> >>
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >>>>> [2]
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> >>>>>
> >>>>> Best,
> >>>>> Godfrey
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to