Hi everyone, I would like to bring up a discussion about the result type of describe statement, which is introduced in FLIP-84[1]. In previous version, we define the result type of `describe` statement is a single column as following
Statement Result Schema Result Value Result Kind Examples DESCRIBE xx field name: result field type: VARCHAR(n) (n is the max length of values) describe the detail of an object (single row) SUCCESS_WITH_CONTENT DESCRIBE table_name for "describe table_name", the result value is the `toString` value of `TableSchema`, which is an unstructured data. It's hard to for user to use this info. for example: TableSchema schema = TableSchema.builder() .field("f0", DataTypes.BIGINT()) .field("f1", DataTypes.ROW( DataTypes.FIELD("q1", DataTypes.STRING()), DataTypes.FIELD("q2", DataTypes.TIMESTAMP(3)))) .field("f2", DataTypes.STRING()) .field("f3", DataTypes.BIGINT(), "f0 + 1") .watermark("f1.q2", WATERMARK_EXPRESSION, WATERMARK_DATATYPE) .build(); its `toString` value is: root |-- f0: BIGINT |-- f1: ROW<`q1` STRING, `q2` TIMESTAMP(3)> |-- f2: STRING |-- f3: BIGINT AS f0 + 1 |-- WATERMARK FOR f1.q2 AS now() For hive, MySQL, etc., the describe result is table form including field names and field types. which is more familiar with users. TableSchema[2] has watermark expression and compute column, we should also put them into the table: for compute column, it's a column level, we add a new column named `expr`. for watermark expression, it's a table level, we add a special row named `WATERMARK` to represent it. The result will look like about above example: name type expr f0 BIGINT (NULL) f1 ROW<`q1` STRING, `q2` TIMESTAMP(3)> (NULL) f2 STRING NULL f3 BIGINT f0 + 1 WATERMARK (NULL) f1.q2 AS now() now there is a pr FLINK-17112 [3] to implement DESCRIBE statement. What do you think about this update? Any feedback are welcome~ Best, Godfrey [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 [2] https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java [3] https://github.com/apache/flink/pull/11892 godfrey he <godfre...@gmail.com> 于2020年4月6日周一 下午10:38写道: > Hi Timo, > > Sorry for the late reply, and thanks for your correction. > I missed DQL for job submission scenario. > I'll fix the document right away. > > Best, > Godfrey > > Timo Walther <twal...@apache.org> 于2020年4月3日周五 下午9:53写道: > >> Hi Godfrey, >> >> I'm sorry to jump in again but I still need to clarify some things >> around TableResult. >> >> The FLIP says: >> "For DML, this method returns TableResult until the job is submitted. >> For other statements, TableResult is returned until the execution is >> finished." >> >> I thought we agreed on making every execution async? This also means >> returning a TableResult for DQLs even though the execution is not done >> yet. People need access to the JobClient also for batch jobs in order to >> cancel long lasting queries. If people want to wait for the completion >> they can hook into JobClient or collect(). >> >> Can we rephrase this part to: >> >> The FLIP says: >> "For DML and DQL, this method returns TableResult once the job has been >> submitted. For DDL and DCL statements, TableResult is returned once the >> operation has finished." >> >> Regards, >> Timo >> >> >> On 02.04.20 05:27, godfrey he wrote: >> > Hi Aljoscha, Dawid, Timo, >> > >> > Thanks so much for the detailed explanation. >> > Agree with you that the multiline story is not completed now, and we can >> > keep discussion. >> > I will add current discussions and conclusions to the FLIP. >> > >> > Best, >> > Godfrey >> > >> > >> > >> > Timo Walther <twal...@apache.org> 于2020年4月1日周三 下午11:27写道: >> > >> >> Hi Godfrey, >> >> >> >> first of all, I agree with Dawid. The multiline story is not completed >> >> by this FLIP. It just verifies the big picture. >> >> >> >> 1. "control the execution logic through the proposed method if they >> know >> >> what the statements are" >> >> >> >> This is a good point that also Fabian raised in the linked google doc. >> I >> >> could also imagine to return a more complicated POJO when calling >> >> `executeMultiSql()`. >> >> >> >> The POJO would include some `getSqlProperties()` such that a platform >> >> gets insights into the query before executing. We could also trigger >> the >> >> execution more explicitly instead of hiding it behind an iterator. >> >> >> >> 2. "there are some special commands introduced in SQL client" >> >> >> >> For platforms and SQL Client specific commands, we could offer a hook >> to >> >> the parser or a fallback parser in case the regular table environment >> >> parser cannot deal with the statement. >> >> >> >> However, all of that is future work and can be discussed in a separate >> >> FLIP. >> >> >> >> 3. +1 for the `Iterator` instead of `Iterable`. >> >> >> >> 4. "we should convert the checked exception to unchecked exception" >> >> >> >> Yes, I meant using a runtime exception instead of a checked exception. >> >> There was no consensus on putting the exception into the `TableResult`. >> >> >> >> Regards, >> >> Timo >> >> >> >> On 01.04.20 15:35, Dawid Wysakowicz wrote: >> >>> When considering the multi-line support I think it is helpful to start >> >>> with a use case in mind. In my opinion consumers of this method will >> be: >> >>> >> >>> 1. sql-client >> >>> 2. third-part sql based platforms >> >>> >> >>> @Godfrey As for the quit/source/... commands. I think those belong to >> >>> the responsibility of aforementioned. I think they should not be >> >>> understandable by the TableEnvironment. What would quit on a >> >>> TableEnvironment do? Moreover I think such commands should be prefixed >> >>> appropriately. I think it's a common practice to e.g. prefix those >> with >> >>> ! or : to say they are meta commands of the tool rather than a query. >> >>> >> >>> I also don't necessarily understand why platform users need to know >> the >> >>> kind of the query to use the proposed method. They should get the type >> >>> from the TableResult#ResultKind. If the ResultKind is SUCCESS, it was >> a >> >>> DCL/DDL. If SUCCESS_WITH_CONTENT it was a DML/DQL. If that's not >> enough >> >>> we can enrich the TableResult with more explicit kind of query, but so >> >>> far I don't see such a need. >> >>> >> >>> @Kurt In those cases I would assume the developers want to present >> >>> results of the queries anyway. Moreover I think it is safe to assume >> >>> they can adhere to such a contract that the results must be iterated. >> >>> >> >>> For direct users of TableEnvironment/Table API this method does not >> make >> >>> much sense anyway, in my opinion. I think we can rather safely assume >> in >> >>> this scenario they do not want to submit multiple queries at a single >> >> time. >> >>> >> >>> Best, >> >>> >> >>> Dawid >> >>> >> >>> >> >>> On 01/04/2020 15:07, Kurt Young wrote: >> >>>> One comment to `executeMultilineSql`, I'm afraid sometimes user might >> >>>> forget to >> >>>> iterate the returned iterators, e.g. user submits a bunch of DDLs and >> >>>> expect the >> >>>> framework will execute them one by one. But it didn't. >> >>>> >> >>>> Best, >> >>>> Kurt >> >>>> >> >>>> >> >>>> On Wed, Apr 1, 2020 at 5:10 PM Aljoscha Krettek<aljos...@apache.org> >> >> wrote: >> >>>> >> >>>>> Agreed to what Dawid and Timo said. >> >>>>> >> >>>>> To answer your question about multi line SQL: no, we don't think we >> >> need >> >>>>> this in Flink 1.11, we only wanted to make sure that the interfaces >> >> that >> >>>>> we now put in place will potentially allow this in the future. >> >>>>> >> >>>>> Best, >> >>>>> Aljoscha >> >>>>> >> >>>>> On 01.04.20 09:31, godfrey he wrote: >> >>>>>> Hi, Timo & Dawid, >> >>>>>> >> >>>>>> Thanks so much for the effort of `multiline statements supporting`, >> >>>>>> I have a few questions about this method: >> >>>>>> >> >>>>>> 1. users can well control the execution logic through the proposed >> >> method >> >>>>>> if they know what the statements are (a statement is a DDL, a >> DML >> >> or >> >>>>>> others). >> >>>>>> but if a statement is from a file, that means users do not know >> what >> >> the >> >>>>>> statements are, >> >>>>>> the execution behavior is unclear. >> >>>>>> As a platform user, I think this method is hard to use, unless the >> >>>>> platform >> >>>>>> defines >> >>>>>> a set of rule about the statements order, such as: no select in the >> >>>>> middle, >> >>>>>> dml must be at tail of sql file (which may be the most case in >> product >> >>>>>> env). >> >>>>>> Otherwise the platform must parse the sql first, then know what the >> >>>>>> statements are. >> >>>>>> If do like that, the platform can handle all cases through >> >> `executeSql` >> >>>>> and >> >>>>>> `StatementSet`. >> >>>>>> >> >>>>>> 2. SQL client can't also use `executeMultilineSql` to supports >> >> multiline >> >>>>>> statements, >> >>>>>> because there are some special commands introduced in SQL >> client, >> >>>>>> such as `quit`, `source`, `load jar` (not exist now, but maybe we >> need >> >>>>> this >> >>>>>> command >> >>>>>> to support dynamic table source and udf). >> >>>>>> Does TableEnvironment also supports those commands? >> >>>>>> >> >>>>>> 3. btw, we must have this feature in release-1.11? I find there are >> >> few >> >>>>>> user cases >> >>>>>> in the feedback document which behavior is unclear now. >> >>>>>> >> >>>>>> regarding to "change the return value from `Iterable<Row` to >> >>>>>> `Iterator<Row`", >> >>>>>> I couldn't agree more with this change. Just as Dawid mentioned >> >>>>>> "The contract of the Iterable#iterator is that it returns a new >> >> iterator >> >>>>>> each time, >> >>>>>> which effectively means we can iterate the results multiple >> >> times.", >> >>>>>> we does not provide iterate the results multiple times. >> >>>>>> If we want do that, the client must buffer all results. but it's >> >>>>> impossible >> >>>>>> for streaming job. >> >>>>>> >> >>>>>> Best, >> >>>>>> Godfrey >> >>>>>> >> >>>>>> Dawid Wysakowicz<dwysakow...@apache.org> 于2020年4月1日周三 上午3:14写道: >> >>>>>> >> >>>>>>> Thank you Timo for the great summary! It covers (almost) all the >> >> topics. >> >>>>>>> Even though in the end we are not suggesting much changes to the >> >> current >> >>>>>>> state of FLIP I think it is important to lay out all possible use >> >> cases >> >>>>>>> so that we do not change the execution model every release. >> >>>>>>> >> >>>>>>> There is one additional thing we discussed. Could we change the >> >> result >> >>>>>>> type of TableResult#collect to Iterator<Row>? Even though those >> >>>>>>> interfaces do not differ much. I think Iterator better describes >> that >> >>>>>>> the results might not be materialized on the client side, but can >> be >> >>>>>>> retrieved on a per record basis. The contract of the >> >> Iterable#iterator >> >>>>>>> is that it returns a new iterator each time, which effectively >> means >> >> we >> >>>>>>> can iterate the results multiple times. Iterating the results is >> not >> >>>>>>> possible when we don't retrieve all the results from the cluster >> at >> >>>>> once. >> >>>>>>> I think we should also use Iterator for >> >>>>>>> TableEnvironment#executeMultilineSql(String statements): >> >>>>>>> Iterator<TableResult>. >> >>>>>>> >> >>>>>>> Best, >> >>>>>>> >> >>>>>>> Dawid >> >>>>>>> >> >>>>>>> On 31/03/2020 19:27, Timo Walther wrote: >> >>>>>>>> Hi Godfrey, >> >>>>>>>> >> >>>>>>>> Aljoscha, Dawid, Klou, and I had another discussion around >> FLIP-84. >> >> In >> >>>>>>>> particular, we discussed how the current status of the FLIP and >> the >> >>>>>>>> future requirements around multiline statements, async/sync, >> >> collect() >> >>>>>>>> fit together. >> >>>>>>>> >> >>>>>>>> We also updated the FLIP-84 Feedback Summary document [1] with >> some >> >>>>>>>> use cases. >> >>>>>>>> >> >>>>>>>> We believe that we found a good solution that also fits to what >> is >> >> in >> >>>>>>>> the current FLIP. So no bigger changes necessary, which is great! >> >>>>>>>> >> >>>>>>>> Our findings were: >> >>>>>>>> >> >>>>>>>> 1. Async vs sync submission of Flink jobs: >> >>>>>>>> >> >>>>>>>> Having a blocking `execute()` in DataStream API was rather a >> >> mistake. >> >>>>>>>> Instead all submissions should be async because this allows >> >> supporting >> >>>>>>>> both modes if necessary. Thus, submitting all queries async >> sounds >> >>>>>>>> good to us. If users want to run a job sync, they can use the >> >>>>>>>> JobClient and wait for completion (or collect() in case of batch >> >> jobs). >> >>>>>>>> >> >>>>>>>> 2. Multi-statement execution: >> >>>>>>>> >> >>>>>>>> For the multi-statement execution, we don't see a contradication >> >> with >> >>>>>>>> the async execution behavior. We imagine a method like: >> >>>>>>>> >> >>>>>>>> TableEnvironment#executeMultilineSql(String statements): >> >>>>>>>> Iterable<TableResult> >> >>>>>>>> >> >>>>>>>> Where the `Iterator#next()` method would trigger the next >> statement >> >>>>>>>> submission. This allows a caller to decide synchronously when to >> >>>>>>>> submit statements async to the cluster. Thus, a service such as >> the >> >>>>>>>> SQL Client can handle the result of each statement individually >> and >> >>>>>>>> process statement by statement sequentially. >> >>>>>>>> >> >>>>>>>> 3. The role of TableResult and result retrieval in general >> >>>>>>>> >> >>>>>>>> `TableResult` is similar to `JobClient`. Instead of returning a >> >>>>>>>> `CompletableFuture` of something, it is a concrete util class >> where >> >>>>>>>> some methods have the behavior of completable future (e.g. >> >> collect(), >> >>>>>>>> print()) and some are already completed (getTableSchema(), >> >>>>>>>> getResultKind()). >> >>>>>>>> >> >>>>>>>> `StatementSet#execute()` returns a single `TableResult` because >> the >> >>>>>>>> order is undefined in a set and all statements have the same >> schema. >> >>>>>>>> Its `collect()` will return a row for each executed `INSERT >> INTO` in >> >>>>>>>> the order of statement definition. >> >>>>>>>> >> >>>>>>>> For simple `SELECT * FROM ...`, the query execution might block >> >> until >> >>>>>>>> `collect()` is called to pull buffered rows from the job (from >> >>>>>>>> socket/REST API what ever we will use in the future). We can say >> >> that >> >>>>>>>> a statement finished successfully, when the >> >> `collect#Iterator#hasNext` >> >>>>>>>> has returned false. >> >>>>>>>> >> >>>>>>>> I hope this summarizes our discussion @Dawid/Aljoscha/Klou? >> >>>>>>>> >> >>>>>>>> It would be great if we can add these findings to the FLIP >> before we >> >>>>>>>> start voting. >> >>>>>>>> >> >>>>>>>> One minor thing: some `execute()` methods still throw a checked >> >>>>>>>> exception; can we remove that from the FLIP? Also the above >> >> mentioned >> >>>>>>>> `Iterator#next()` would trigger an execution without throwing a >> >>>>>>>> checked exception. >> >>>>>>>> >> >>>>>>>> Thanks, >> >>>>>>>> Timo >> >>>>>>>> >> >>>>>>>> [1] >> >>>>>>>> >> >>>>> >> >> >> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit# >> >>>>>>>> On 31.03.20 06:28, godfrey he wrote: >> >>>>>>>>> Hi, Timo & Jark >> >>>>>>>>> >> >>>>>>>>> Thanks for your explanation. >> >>>>>>>>> Agree with you that async execution should always be async, >> >>>>>>>>> and sync execution scenario can be covered by async execution. >> >>>>>>>>> It helps provide an unified entry point for batch and streaming. >> >>>>>>>>> I think we can also use sync execution for some testing. >> >>>>>>>>> So, I agree with you that we provide `executeSql` method and >> it's >> >>>>> async >> >>>>>>>>> method. >> >>>>>>>>> If we want sync method in the future, we can add method named >> >>>>>>>>> `executeSqlSync`. >> >>>>>>>>> >> >>>>>>>>> I think we've reached an agreement. I will update the document, >> and >> >>>>>>>>> start >> >>>>>>>>> voting process. >> >>>>>>>>> >> >>>>>>>>> Best, >> >>>>>>>>> Godfrey >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Jark Wu<imj...@gmail.com> 于2020年3月31日周二 上午12:46写道: >> >>>>>>>>> >> >>>>>>>>>> Hi, >> >>>>>>>>>> >> >>>>>>>>>> I didn't follow the full discussion. >> >>>>>>>>>> But I share the same concern with Timo that streaming queries >> >> should >> >>>>>>>>>> always >> >>>>>>>>>> be async. >> >>>>>>>>>> Otherwise, I can image it will cause a lot of confusion and >> >> problems >> >>>>> if >> >>>>>>>>>> users don't deeply keep the "sync" in mind (e.g. client hangs). >> >>>>>>>>>> Besides, the streaming mode is still the majority use cases of >> >> Flink >> >>>>>>>>>> and >> >>>>>>>>>> Flink SQL. We should put the usability at a high priority. >> >>>>>>>>>> >> >>>>>>>>>> Best, >> >>>>>>>>>> Jark >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> On Mon, 30 Mar 2020 at 23:27, Timo Walther<twal...@apache.org> >> >>>>> wrote: >> >>>>>>>>>>> Hi Godfrey, >> >>>>>>>>>>> >> >>>>>>>>>>> maybe I wasn't expressing my biggest concern enough in my last >> >> mail. >> >>>>>>>>>>> Even in a singleline and sync execution, I think that >> streaming >> >>>>>>>>>>> queries >> >>>>>>>>>>> should not block the execution. Otherwise it is not possible >> to >> >> call >> >>>>>>>>>>> collect() or print() on them afterwards. >> >>>>>>>>>>> >> >>>>>>>>>>> "there are too many things need to discuss for multiline": >> >>>>>>>>>>> >> >>>>>>>>>>> True, I don't want to solve all of them right now. But what I >> >> know >> >>>>> is >> >>>>>>>>>>> that our newly introduced methods should fit into a multiline >> >>>>>>>>>>> execution. >> >>>>>>>>>>> There is no big difference of calling `executeSql(A), >> >>>>>>>>>>> executeSql(B)` and >> >>>>>>>>>>> processing a multiline file `A;\nB;`. >> >>>>>>>>>>> >> >>>>>>>>>>> I think the example that you mentioned can simply be undefined >> >> for >> >>>>>>>>>>> now. >> >>>>>>>>>>> Currently, no catalog is modifying data but just metadata. >> This >> >> is a >> >>>>>>>>>>> separate discussion. >> >>>>>>>>>>> >> >>>>>>>>>>> "result of the second statement is indeterministic": >> >>>>>>>>>>> >> >>>>>>>>>>> Sure this is indeterministic. But this is the implementers >> fault >> >>>>>>>>>>> and we >> >>>>>>>>>>> cannot forbid such pipelines. >> >>>>>>>>>>> >> >>>>>>>>>>> How about we always execute streaming queries async? It would >> >>>>> unblock >> >>>>>>>>>>> executeSql() and multiline statements. >> >>>>>>>>>>> >> >>>>>>>>>>> Having a `executeSqlAsync()` is useful for batch. However, I >> >> don't >> >>>>>>>>>>> want >> >>>>>>>>>>> `sync/async` be the new batch/stream flag. The execution >> behavior >> >>>>>>>>>>> should >> >>>>>>>>>>> come from the query itself. >> >>>>>>>>>>> >> >>>>>>>>>>> Regards, >> >>>>>>>>>>> Timo >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> On 30.03.20 11:12, godfrey he wrote: >> >>>>>>>>>>>> Hi Timo, >> >>>>>>>>>>>> >> >>>>>>>>>>>> Agree with you that streaming queries is our top priority, >> >>>>>>>>>>>> but I think there are too many things need to discuss for >> >> multiline >> >>>>>>>>>>>> statements: >> >>>>>>>>>>>> e.g. >> >>>>>>>>>>>> 1. what's the behaivor of DDL and DML mixing for async >> >> execution: >> >>>>>>>>>>>> create table t1 xxx; >> >>>>>>>>>>>> create table t2 xxx; >> >>>>>>>>>>>> insert into t2 select * from t1 where xxx; >> >>>>>>>>>>>> drop table t1; // t1 may be a MySQL table, the data will >> also be >> >>>>>>>>>> deleted. >> >>>>>>>>>>>> t1 is dropped when "insert" job is running. >> >>>>>>>>>>>> >> >>>>>>>>>>>> 2. what's the behaivor of unified scenario for async >> execution: >> >>>>>>>>>>>> (as you >> >>>>>>>>>>>> mentioned) >> >>>>>>>>>>>> INSERT INTO t1 SELECT * FROM s; >> >>>>>>>>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM; >> >>>>>>>>>>>> >> >>>>>>>>>>>> The result of the second statement is indeterministic, >> because >> >> the >> >>>>>>>>>> first >> >>>>>>>>>>>> statement maybe is running. >> >>>>>>>>>>>> I think we need to put a lot of effort to define the >> behavior of >> >>>>>>>>>>> logically >> >>>>>>>>>>>> related queries. >> >>>>>>>>>>>> >> >>>>>>>>>>>> In this FLIP, I suggest we only handle single statement, and >> we >> >>>>> also >> >>>>>>>>>>>> introduce an async execute method >> >>>>>>>>>>>> which is more important and more often used for users. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Dor the sync methods (like `TableEnvironment.executeSql` and >> >>>>>>>>>>>> `StatementSet.execute`), >> >>>>>>>>>>>> the result will be returned until the job is finished. The >> >>>>> following >> >>>>>>>>>>>> methods will be introduced in this FLIP: >> >>>>>>>>>>>> >> >>>>>>>>>>>> /** >> >>>>>>>>>>>> * Asynchronously execute the given single statement >> >>>>>>>>>>>> */ >> >>>>>>>>>>>> TableEnvironment.executeSqlAsync(String statement): >> TableResult >> >>>>>>>>>>>> >> >>>>>>>>>>>> /** >> >>>>>>>>>>>> * Asynchronously execute the dml statements as a batch >> >>>>>>>>>>>> */ >> >>>>>>>>>>>> StatementSet.executeAsync(): TableResult >> >>>>>>>>>>>> >> >>>>>>>>>>>> public interface TableResult { >> >>>>>>>>>>>> /** >> >>>>>>>>>>>> * return JobClient for DQL and DML in async mode, >> else >> >>>>> return >> >>>>>>>>>>>> Optional.empty >> >>>>>>>>>>>> */ >> >>>>>>>>>>>> Optional<JobClient> getJobClient(); >> >>>>>>>>>>>> } >> >>>>>>>>>>>> >> >>>>>>>>>>>> what do you think? >> >>>>>>>>>>>> >> >>>>>>>>>>>> Best, >> >>>>>>>>>>>> Godfrey >> >>>>>>>>>>>> >> >>>>>>>>>>>> Timo Walther<twal...@apache.org> 于2020年3月26日周四 下午9:15写道: >> >>>>>>>>>>>> >> >>>>>>>>>>>>> Hi Godfrey, >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> executing streaming queries must be our top priority because >> >> this >> >>>>> is >> >>>>>>>>>>>>> what distinguishes Flink from competitors. If we change the >> >>>>>>>>>>>>> execution >> >>>>>>>>>>>>> behavior, we should think about the other cases as well to >> not >> >>>>> break >> >>>>>>>>>> the >> >>>>>>>>>>>>> API a third time. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> I fear that just having an async execute method will not be >> >> enough >> >>>>>>>>>>>>> because users should be able to mix streaming and batch >> queries >> >>>>> in a >> >>>>>>>>>>>>> unified scenario. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> If I remember it correctly, we had some discussions in the >> past >> >>>>>>>>>>>>> about >> >>>>>>>>>>>>> what decides about the execution mode of a query. >> Currently, we >> >>>>>>>>>>>>> would >> >>>>>>>>>>>>> like to let the query decide, not derive it from the >> sources. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> So I could image a multiline pipeline as: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> USE CATALOG 'mycat'; >> >>>>>>>>>>>>> INSERT INTO t1 SELECT * FROM s; >> >>>>>>>>>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM; >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> For executeMultilineSql(): >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> sync because regular SQL >> >>>>>>>>>>>>> sync because regular Batch SQL >> >>>>>>>>>>>>> async because Streaming SQL >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> For executeAsyncMultilineSql(): >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> async because everything should be async >> >>>>>>>>>>>>> async because everything should be async >> >>>>>>>>>>>>> async because everything should be async >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> What we should not start for executeAsyncMultilineSql(): >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> sync because DDL >> >>>>>>>>>>>>> async because everything should be async >> >>>>>>>>>>>>> async because everything should be async >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> What are you thoughts here? >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Regards, >> >>>>>>>>>>>>> Timo >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On 26.03.20 12:50, godfrey he wrote: >> >>>>>>>>>>>>>> Hi Timo, >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I agree with you that streaming queries mostly need async >> >>>>>>>>>>>>>> execution. >> >>>>>>>>>>>>>> In fact, our original plan is only introducing sync >> methods in >> >>>>> this >> >>>>>>>>>>> FLIP, >> >>>>>>>>>>>>>> and async methods (like "executeSqlAsync") will be >> introduced >> >> in >> >>>>>>>>>>>>>> the >> >>>>>>>>>>>>> future >> >>>>>>>>>>>>>> which is mentioned in the appendix. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Maybe the async methods also need to be considered in this >> >> FLIP. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I think sync methods is also useful for streaming which >> can be >> >>>>> used >> >>>>>>>>>> to >> >>>>>>>>>>>>> run >> >>>>>>>>>>>>>> bounded source. >> >>>>>>>>>>>>>> Maybe we should check whether all sources are bounded in >> sync >> >>>>>>>>>> execution >> >>>>>>>>>>>>>> mode. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Also, if we block for streaming queries, we could never >> >> support >> >>>>>>>>>>>>>>> multiline files. Because the first INSERT INTO would block >> >> the >> >>>>>>>>>> further >> >>>>>>>>>>>>>>> execution. >> >>>>>>>>>>>>>> agree with you, we need async method to submit multiline >> >> files, >> >>>>>>>>>>>>>> and files should be limited that the DQL and DML should be >> >>>>>>>>>>>>>> always in >> >>>>>>>>>>> the >> >>>>>>>>>>>>>> end for streaming. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Best, >> >>>>>>>>>>>>>> Godfrey >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Timo Walther<twal...@apache.org> 于2020年3月26日周四 下午4:29写道: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Hi Godfrey, >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> having control over the job after submission is a >> requirement >> >>>>> that >> >>>>>>>>>> was >> >>>>>>>>>>>>>>> requested frequently (some examples are [1], [2]). Users >> >> would >> >>>>>>>>>>>>>>> like >> >>>>>>>>>> to >> >>>>>>>>>>>>>>> get insights about the running or completed job. Including >> >> the >> >>>>>>>>>> jobId, >> >>>>>>>>>>>>>>> jobGraph etc., the JobClient summarizes these properties. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> It is good to have a discussion about >> >> synchronous/asynchronous >> >>>>>>>>>>>>>>> submission now to have a complete execution picture. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> I thought we submit streaming queries mostly async and >> just >> >>>>>>>>>>>>>>> wait for >> >>>>>>>>>>> the >> >>>>>>>>>>>>>>> successful submission. If we block for streaming queries, >> how >> >>>>>>>>>>>>>>> can we >> >>>>>>>>>>>>>>> collect() or print() results? >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Also, if we block for streaming queries, we could never >> >> support >> >>>>>>>>>>>>>>> multiline files. Because the first INSERT INTO would block >> >> the >> >>>>>>>>>> further >> >>>>>>>>>>>>>>> execution. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> If we decide to block entirely on streaming queries, we >> need >> >> the >> >>>>>>>>>> async >> >>>>>>>>>>>>>>> execution methods in the design already. However, I would >> >>>>>>>>>>>>>>> rather go >> >>>>>>>>>>> for >> >>>>>>>>>>>>>>> non-blocking streaming queries. Also with the `EMIT >> STREAM` >> >> key >> >>>>>>>>>>>>>>> word >> >>>>>>>>>>> in >> >>>>>>>>>>>>>>> mind that we might add to SQL statements soon. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Regards, >> >>>>>>>>>>>>>>> Timo >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> [1]https://issues.apache.org/jira/browse/FLINK-16761 >> >>>>>>>>>>>>>>> [2]https://issues.apache.org/jira/browse/FLINK-12214 >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> On 25.03.20 16:30, godfrey he wrote: >> >>>>>>>>>>>>>>>> Hi Timo, >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Thanks for the updating. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Regarding to "multiline statement support", I'm also fine >> >> that >> >>>>>>>>>>>>>>>> `TableEnvironment.executeSql()` only supports single line >> >>>>>>>>>> statement, >> >>>>>>>>>>>>> and >> >>>>>>>>>>>>>>> we >> >>>>>>>>>>>>>>>> can support multiline statement later (needs more >> discussion >> >>>>>>>>>>>>>>>> about >> >>>>>>>>>>>>> this). >> >>>>>>>>>>>>>>>> Regarding to "StatementSet.explian()", I don't have >> strong >> >>>>>>>>>>>>>>>> opinions >> >>>>>>>>>>>>> about >> >>>>>>>>>>>>>>>> that. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Regarding to "TableResult.getJobClient()", I think it's >> >>>>>>>>>> unnecessary. >> >>>>>>>>>>>>> The >> >>>>>>>>>>>>>>>> reason is: first, many statements (e.g. DDL, show xx, use >> >> xx) >> >>>>>>>>>>>>>>>> will >> >>>>>>>>>>> not >> >>>>>>>>>>>>>>>> submit a Flink job. second, >> `TableEnvironment.executeSql()` >> >> and >> >>>>>>>>>>>>>>>> `StatementSet.execute()` are synchronous method, >> >> `TableResult` >> >>>>>>>>>>>>>>>> will >> >>>>>>>>>>> be >> >>>>>>>>>>>>>>>> returned only after the job is finished or failed. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Regarding to "whether StatementSet.execute() needs to >> throw >> >>>>>>>>>>>>> exception", I >> >>>>>>>>>>>>>>>> think we should choose a unified way to tell whether the >> >>>>>>>>>>>>>>>> execution >> >>>>>>>>>> is >> >>>>>>>>>>>>>>>> successful. If `TableResult` contains ERROR kind >> >> (non-runtime >> >>>>>>>>>>>>> exception), >> >>>>>>>>>>>>>>>> users need to not only check the result but also catch >> the >> >>>>>>>>>>>>>>>> runtime >> >>>>>>>>>>>>>>>> exception in their code. or `StatementSet.execute()` does >> >> not >> >>>>>>>>>>>>>>>> throw >> >>>>>>>>>>> any >> >>>>>>>>>>>>>>>> exception (including runtime exception), all exception >> >>>>>>>>>>>>>>>> messages are >> >>>>>>>>>>> in >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>> result. I prefer "StatementSet.execute() needs to throw >> >>>>>>>>>> exception". >> >>>>>>>>>>> cc >> >>>>>>>>>>>>>>> @Jark >> >>>>>>>>>>>>>>>> Wu<imj...@gmail.com> >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> I will update the agreed parts to the document first. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Best, >> >>>>>>>>>>>>>>>> Godfrey >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Timo Walther<twal...@apache.org> 于2020年3月25日周三 >> >>>>>>>>>>>>>>>> 下午6:51写道: >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Hi Godfrey, >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> thanks for starting the discussion on the mailing list. >> And >> >>>>>>>>>>>>>>>>> sorry >> >>>>>>>>>>>>> again >> >>>>>>>>>>>>>>>>> for the late reply to FLIP-84. I have updated the Google >> >> doc >> >>>>> one >> >>>>>>>>>>> more >> >>>>>>>>>>>>>>>>> time to incorporate the offline discussions. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> From Dawid's and my view, it is fine to >> postpone the >> >>>>>>>>>>>>>>>>> multiline >> >>>>>>>>>>>>> support >> >>>>>>>>>>>>>>>>> to a separate method. This can be future work even >> though >> >> we >> >>>>>>>>>>>>>>>>> will >> >>>>>>>>>>> need >> >>>>>>>>>>>>>>>>> it rather soon. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> If there are no objections, I suggest to update the >> FLIP-84 >> >>>>>>>>>>>>>>>>> again >> >>>>>>>>>>> and >> >>>>>>>>>>>>>>>>> have another voting process. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Thanks, >> >>>>>>>>>>>>>>>>> Timo >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> On 25.03.20 11:17, godfrey he wrote: >> >>>>>>>>>>>>>>>>>> Hi community, >> >>>>>>>>>>>>>>>>>> Timo, Fabian and Dawid have some feedbacks about >> >> FLIP-84[1]. >> >>>>>>>>>>>>>>>>>> The >> >>>>>>>>>>>>>>>>> feedbacks >> >>>>>>>>>>>>>>>>>> are all about new introduced methods. We had a >> discussion >> >>>>>>>>>>> yesterday, >> >>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>> most of feedbacks have been agreed upon. Here is the >> >>>>>>>>>>>>>>>>>> conclusions: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> *1. about proposed methods in `TableEnvironment`:* >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> the original proposed methods: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> TableEnvironment.createDmlBatch(): DmlBatch >> >>>>>>>>>>>>>>>>>> TableEnvironment.executeStatement(String statement): >> >>>>>>>>>>>>>>>>>> ResultTable >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> the new proposed methods: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> // we should not use abbreviations in the API, and the >> >> term >> >>>>>>>>>> "Batch" >> >>>>>>>>>>>>> is >> >>>>>>>>>>>>>>>>>> easily confused with batch/streaming processing >> >>>>>>>>>>>>>>>>>> TableEnvironment.createStatementSet(): StatementSet >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> // every method that takes SQL should have `Sql` in its >> >> name >> >>>>>>>>>>>>>>>>>> // supports multiline statement ??? >> >>>>>>>>>>>>>>>>>> TableEnvironment.executeSql(String statement): >> TableResult >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> // new methods. supports explaining DQL and DML >> >>>>>>>>>>>>>>>>>> TableEnvironment.explainSql(String statement, >> >>>>> ExplainDetail... >> >>>>>>>>>>>>>>> details): >> >>>>>>>>>>>>>>>>>> String >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> *2. about proposed related classes:* >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> the original proposed classes: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> interface DmlBatch { >> >>>>>>>>>>>>>>>>>> void addInsert(String insert); >> >>>>>>>>>>>>>>>>>> void addInsert(String targetPath, Table >> table); >> >>>>>>>>>>>>>>>>>> ResultTable execute() throws Exception ; >> >>>>>>>>>>>>>>>>>> String explain(boolean extended); >> >>>>>>>>>>>>>>>>>> } >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> public interface ResultTable { >> >>>>>>>>>>>>>>>>>> TableSchema getResultSchema(); >> >>>>>>>>>>>>>>>>>> Iterable<Row> getResultRows(); >> >>>>>>>>>>>>>>>>>> } >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> the new proposed classes: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> interface StatementSet { >> >>>>>>>>>>>>>>>>>> // every method that takes SQL should have >> >> `Sql` in >> >>>>>>>>>>>>>>>>>> its >> >>>>>>>>>>> name >> >>>>>>>>>>>>>>>>>> // return StatementSet instance for fluent >> >>>>> programming >> >>>>>>>>>>>>>>>>>> addInsertSql(String statement): >> StatementSet >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> // return StatementSet instance for fluent >> >>>>> programming >> >>>>>>>>>>>>>>>>>> addInsert(String tablePath, Table table): >> >>>>> StatementSet >> >>>>>>>>>>>>>>>>>> // new method. support overwrite mode >> >>>>>>>>>>>>>>>>>> addInsert(String tablePath, Table table, >> >> boolean >> >>>>>>>>>>> overwrite): >> >>>>>>>>>>>>>>>>>> StatementSet >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> explain(): String >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> // new method. supports adding more details >> >> for the >> >>>>>>>>>> result >> >>>>>>>>>>>>>>>>>> explain(ExplainDetail... extraDetails): >> String >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> // throw exception ??? >> >>>>>>>>>>>>>>>>>> execute(): TableResult >> >>>>>>>>>>>>>>>>>> } >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> interface TableResult { >> >>>>>>>>>>>>>>>>>> getTableSchema(): TableSchema >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> // avoid custom parsing of an "OK" row in >> >>>>> programming >> >>>>>>>>>>>>>>>>>> getResultKind(): ResultKind >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> // instead of `get` make it explicit that >> this >> >> is >> >>>>>>>>>>>>>>>>>> might >> >>>>>>>>>> be >> >>>>>>>>>>>>>>>>> triggering >> >>>>>>>>>>>>>>>>>> an expensive operation >> >>>>>>>>>>>>>>>>>> collect(): Iterable<Row> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> // for fluent programming >> >>>>>>>>>>>>>>>>>> print(): Unit >> >>>>>>>>>>>>>>>>>> } >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> enum ResultKind { >> >>>>>>>>>>>>>>>>>> SUCCESS, // for DDL, DCL and statements >> with a >> >>>>> simple >> >>>>>>>>>> "OK" >> >>>>>>>>>>>>>>>>>> SUCCESS_WITH_CONTENT, // rows with >> important >> >>>>>>>>>>>>>>>>>> content are >> >>>>>>>>>>>>>>> available >> >>>>>>>>>>>>>>>>>> (DML, DQL) >> >>>>>>>>>>>>>>>>>> } >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> *3. new proposed methods in `Table`* >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> `Table.insertInto()` will be deprecated, and the >> following >> >>>>>>>>>> methods >> >>>>>>>>>>>>> are >> >>>>>>>>>>>>>>>>>> introduced: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> Table.executeInsert(String tablePath): TableResult >> >>>>>>>>>>>>>>>>>> Table.executeInsert(String tablePath, boolean >> overwrite): >> >>>>>>>>>>> TableResult >> >>>>>>>>>>>>>>>>>> Table.explain(ExplainDetail... details): String >> >>>>>>>>>>>>>>>>>> Table.execute(): TableResult >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> There are two issues need further discussion, one is >> >> whether >> >>>>>>>>>>>>>>>>>> `TableEnvironment.executeSql(String statement): >> >> TableResult` >> >>>>>>>>>> needs >> >>>>>>>>>>> to >> >>>>>>>>>>>>>>>>>> support multiline statement (or whether >> `TableEnvironment` >> >>>>>>>>>>>>>>>>>> needs >> >>>>>>>>>> to >> >>>>>>>>>>>>>>>>> support >> >>>>>>>>>>>>>>>>>> multiline statement), and another one is whether >> >>>>>>>>>>>>>>> `StatementSet.execute()` >> >>>>>>>>>>>>>>>>>> needs to throw exception. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> please refer to the feedback document [2] for the >> details. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> Any suggestions are warmly welcomed! >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> [1] >> >>>>>>>>>>>>>>>>>> >> >>>>> >> >> >> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 >> >>>>>>>>>>>>>>>>>> [2] >> >>>>>>>>>>>>>>>>>> >> >>>>> >> >> >> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit >> >>>>>>>>>>>>>>>>>> Best, >> >>>>>>>>>>>>>>>>>> Godfrey >> >>>>>>>>>>>>>>>>>> >> >> >> >> >> > >> >>