Hi Timo, Agree with you that streaming queries is our top priority, but I think there are too many things need to discuss for multiline statements: e.g. 1. what's the behaivor of DDL and DML mixing for async execution: create table t1 xxx; create table t2 xxx; insert into t2 select * from t1 where xxx; drop table t1; // t1 may be a MySQL table, the data will also be deleted.
t1 is dropped when "insert" job is running. 2. what's the behaivor of unified scenario for async execution: (as you mentioned) INSERT INTO t1 SELECT * FROM s; INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM; The result of the second statement is indeterministic, because the first statement maybe is running. I think we need to put a lot of effort to define the behavior of logically related queries. In this FLIP, I suggest we only handle single statement, and we also introduce an async execute method which is more important and more often used for users. Dor the sync methods (like `TableEnvironment.executeSql` and `StatementSet.execute`), the result will be returned until the job is finished. The following methods will be introduced in this FLIP: /** * Asynchronously execute the given single statement */ TableEnvironment.executeSqlAsync(String statement): TableResult /** * Asynchronously execute the dml statements as a batch */ StatementSet.executeAsync(): TableResult public interface TableResult { /** * return JobClient for DQL and DML in async mode, else return Optional.empty */ Optional<JobClient> getJobClient(); } what do you think? Best, Godfrey Timo Walther <twal...@apache.org> 于2020年3月26日周四 下午9:15写道: > Hi Godfrey, > > executing streaming queries must be our top priority because this is > what distinguishes Flink from competitors. If we change the execution > behavior, we should think about the other cases as well to not break the > API a third time. > > I fear that just having an async execute method will not be enough > because users should be able to mix streaming and batch queries in a > unified scenario. > > If I remember it correctly, we had some discussions in the past about > what decides about the execution mode of a query. Currently, we would > like to let the query decide, not derive it from the sources. > > So I could image a multiline pipeline as: > > USE CATALOG 'mycat'; > INSERT INTO t1 SELECT * FROM s; > INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM; > > For executeMultilineSql(): > > sync because regular SQL > sync because regular Batch SQL > async because Streaming SQL > > For executeAsyncMultilineSql(): > > async because everything should be async > async because everything should be async > async because everything should be async > > What we should not start for executeAsyncMultilineSql(): > > sync because DDL > async because everything should be async > async because everything should be async > > What are you thoughts here? > > Regards, > Timo > > > On 26.03.20 12:50, godfrey he wrote: > > Hi Timo, > > > > I agree with you that streaming queries mostly need async execution. > > In fact, our original plan is only introducing sync methods in this FLIP, > > and async methods (like "executeSqlAsync") will be introduced in the > future > > which is mentioned in the appendix. > > > > Maybe the async methods also need to be considered in this FLIP. > > > > I think sync methods is also useful for streaming which can be used to > run > > bounded source. > > Maybe we should check whether all sources are bounded in sync execution > > mode. > > > >> Also, if we block for streaming queries, we could never support > >> multiline files. Because the first INSERT INTO would block the further > >> execution. > > agree with you, we need async method to submit multiline files, > > and files should be limited that the DQL and DML should be always in the > > end for streaming. > > > > Best, > > Godfrey > > > > Timo Walther <twal...@apache.org> 于2020年3月26日周四 下午4:29写道: > > > >> Hi Godfrey, > >> > >> having control over the job after submission is a requirement that was > >> requested frequently (some examples are [1], [2]). Users would like to > >> get insights about the running or completed job. Including the jobId, > >> jobGraph etc., the JobClient summarizes these properties. > >> > >> It is good to have a discussion about synchronous/asynchronous > >> submission now to have a complete execution picture. > >> > >> I thought we submit streaming queries mostly async and just wait for the > >> successful submission. If we block for streaming queries, how can we > >> collect() or print() results? > >> > >> Also, if we block for streaming queries, we could never support > >> multiline files. Because the first INSERT INTO would block the further > >> execution. > >> > >> If we decide to block entirely on streaming queries, we need the async > >> execution methods in the design already. However, I would rather go for > >> non-blocking streaming queries. Also with the `EMIT STREAM` key word in > >> mind that we might add to SQL statements soon. > >> > >> Regards, > >> Timo > >> > >> [1] https://issues.apache.org/jira/browse/FLINK-16761 > >> [2] https://issues.apache.org/jira/browse/FLINK-12214 > >> > >> On 25.03.20 16:30, godfrey he wrote: > >>> Hi Timo, > >>> > >>> Thanks for the updating. > >>> > >>> Regarding to "multiline statement support", I'm also fine that > >>> `TableEnvironment.executeSql()` only supports single line statement, > and > >> we > >>> can support multiline statement later (needs more discussion about > this). > >>> > >>> Regarding to "StatementSet.explian()", I don't have strong opinions > about > >>> that. > >>> > >>> Regarding to "TableResult.getJobClient()", I think it's unnecessary. > The > >>> reason is: first, many statements (e.g. DDL, show xx, use xx) will not > >>> submit a Flink job. second, `TableEnvironment.executeSql()` and > >>> `StatementSet.execute()` are synchronous method, `TableResult` will be > >>> returned only after the job is finished or failed. > >>> > >>> Regarding to "whether StatementSet.execute() needs to throw > exception", I > >>> think we should choose a unified way to tell whether the execution is > >>> successful. If `TableResult` contains ERROR kind (non-runtime > exception), > >>> users need to not only check the result but also catch the runtime > >>> exception in their code. or `StatementSet.execute()` does not throw any > >>> exception (including runtime exception), all exception messages are in > >> the > >>> result. I prefer "StatementSet.execute() needs to throw exception". cc > >> @Jark > >>> Wu <imj...@gmail.com> > >>> > >>> I will update the agreed parts to the document first. > >>> > >>> Best, > >>> Godfrey > >>> > >>> > >>> Timo Walther <twal...@apache.org> 于2020年3月25日周三 下午6:51写道: > >>> > >>>> Hi Godfrey, > >>>> > >>>> thanks for starting the discussion on the mailing list. And sorry > again > >>>> for the late reply to FLIP-84. I have updated the Google doc one more > >>>> time to incorporate the offline discussions. > >>>> > >>>> From Dawid's and my view, it is fine to postpone the multiline > support > >>>> to a separate method. This can be future work even though we will need > >>>> it rather soon. > >>>> > >>>> If there are no objections, I suggest to update the FLIP-84 again and > >>>> have another voting process. > >>>> > >>>> Thanks, > >>>> Timo > >>>> > >>>> > >>>> On 25.03.20 11:17, godfrey he wrote: > >>>>> Hi community, > >>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The > >>>> feedbacks > >>>>> are all about new introduced methods. We had a discussion yesterday, > >> and > >>>>> most of feedbacks have been agreed upon. Here is the conclusions: > >>>>> > >>>>> *1. about proposed methods in `TableEnvironment`:* > >>>>> > >>>>> the original proposed methods: > >>>>> > >>>>> TableEnvironment.createDmlBatch(): DmlBatch > >>>>> TableEnvironment.executeStatement(String statement): ResultTable > >>>>> > >>>>> the new proposed methods: > >>>>> > >>>>> // we should not use abbreviations in the API, and the term "Batch" > is > >>>>> easily confused with batch/streaming processing > >>>>> TableEnvironment.createStatementSet(): StatementSet > >>>>> > >>>>> // every method that takes SQL should have `Sql` in its name > >>>>> // supports multiline statement ??? > >>>>> TableEnvironment.executeSql(String statement): TableResult > >>>>> > >>>>> // new methods. supports explaining DQL and DML > >>>>> TableEnvironment.explainSql(String statement, ExplainDetail... > >> details): > >>>>> String > >>>>> > >>>>> > >>>>> *2. about proposed related classes:* > >>>>> > >>>>> the original proposed classes: > >>>>> > >>>>> interface DmlBatch { > >>>>> void addInsert(String insert); > >>>>> void addInsert(String targetPath, Table table); > >>>>> ResultTable execute() throws Exception ; > >>>>> String explain(boolean extended); > >>>>> } > >>>>> > >>>>> public interface ResultTable { > >>>>> TableSchema getResultSchema(); > >>>>> Iterable<Row> getResultRows(); > >>>>> } > >>>>> > >>>>> the new proposed classes: > >>>>> > >>>>> interface StatementSet { > >>>>> // every method that takes SQL should have `Sql` in its name > >>>>> // return StatementSet instance for fluent programming > >>>>> addInsertSql(String statement): StatementSet > >>>>> > >>>>> // return StatementSet instance for fluent programming > >>>>> addInsert(String tablePath, Table table): StatementSet > >>>>> > >>>>> // new method. support overwrite mode > >>>>> addInsert(String tablePath, Table table, boolean overwrite): > >>>>> StatementSet > >>>>> > >>>>> explain(): String > >>>>> > >>>>> // new method. supports adding more details for the result > >>>>> explain(ExplainDetail... extraDetails): String > >>>>> > >>>>> // throw exception ??? > >>>>> execute(): TableResult > >>>>> } > >>>>> > >>>>> interface TableResult { > >>>>> getTableSchema(): TableSchema > >>>>> > >>>>> // avoid custom parsing of an "OK" row in programming > >>>>> getResultKind(): ResultKind > >>>>> > >>>>> // instead of `get` make it explicit that this is might be > >>>> triggering > >>>>> an expensive operation > >>>>> collect(): Iterable<Row> > >>>>> > >>>>> // for fluent programming > >>>>> print(): Unit > >>>>> } > >>>>> > >>>>> enum ResultKind { > >>>>> SUCCESS, // for DDL, DCL and statements with a simple "OK" > >>>>> SUCCESS_WITH_CONTENT, // rows with important content are > >> available > >>>>> (DML, DQL) > >>>>> } > >>>>> > >>>>> > >>>>> *3. new proposed methods in `Table`* > >>>>> > >>>>> `Table.insertInto()` will be deprecated, and the following methods > are > >>>>> introduced: > >>>>> > >>>>> Table.executeInsert(String tablePath): TableResult > >>>>> Table.executeInsert(String tablePath, boolean overwrite): TableResult > >>>>> Table.explain(ExplainDetail... details): String > >>>>> Table.execute(): TableResult > >>>>> > >>>>> There are two issues need further discussion, one is whether > >>>>> `TableEnvironment.executeSql(String statement): TableResult` needs to > >>>>> support multiline statement (or whether `TableEnvironment` needs to > >>>> support > >>>>> multiline statement), and another one is whether > >> `StatementSet.execute()` > >>>>> needs to throw exception. > >>>>> > >>>>> please refer to the feedback document [2] for the details. > >>>>> > >>>>> Any suggestions are warmly welcomed! > >>>>> > >>>>> [1] > >>>>> > >>>> > >> > https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 > >>>>> [2] > >>>>> > >>>> > >> > https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit > >>>>> > >>>>> Best, > >>>>> Godfrey > >>>>> > >>>> > >>>> > >>> > >> > >> > > > >