Hi Jark, Thanks for the update. I think the FLIP looks really well on the high level.
I have a few comments to the code structure in the FLIP: 1) I really don't like how the TableDescriptor exposes protected fields. Moreover why do we need to extend from it? I don't think we need KafkaConnector extends TableDescriptor and alike. We only need the builders e.g. the KafkaConnectorBuilder. If I understand it correctly this is the interface needed from the TableEnvironment perspective and it is the contract that the TableEnvironment expects. I would suggest making it an interface: |@PublicEvolving| |public| interface |TableDescriptor {| | ||List<String> getPartitionedFields();| |Schema getSchema();| | ||Map<String, String> getOptions();| | ||LikeOption[] getLikeOptions();| | ||String getLikePath();| |}| | | Then the TableDescriptorBuilder would work with an internal implementation of this interface |@PublicEvolving| |public| |abstract| |class| |TableDescriptorBuilder<||BUILDER ||extends| |TableDescriptorBuilder<BUILDER>> {| | ||private| |final| |InternalTableDescriptor||descriptor = new InternalTableDescriptor();| | ||/**| | ||* Returns the this builder instance in the type of subclass.| | ||*/| | ||protected| |abstract| |BUILDER self();| | ||/**| | ||* Specifies the table schema.| | ||*/| | ||public| |BUILDER schema(Schema schema) {| | ||descriptor.schema = schema;| | ||return| |self();| | ||}| | ||/**| | ||* Specifies the partition keys of this table.| | ||*/| | ||public| |BUILDER partitionedBy(String... fieldNames) {| | ||checkArgument(descriptor.partitionedFields.isEmpty(), ||"partitionedBy(...) shouldn't be called more than once."||);| | ||descriptor.partitionedFields.addAll(Arrays.asList(fieldNames));| | ||return| |self();| | ||}| | ||/**| | ||* Extends some parts from the original registered table path.| | ||*/| | ||public| |BUILDER like(String tablePath, LikeOption... likeOptions) {| | ||descriptor.likePath = tablePath;| | ||descriptor.likeOptions = likeOptions;| | ||return| |self();| | ||}| | ||protected| |BUILDER option(String key, String value) {| | ||descriptor.options.put(key, value);| | ||return| |self();| | ||}| | ||/**| | ||* Returns created table descriptor.| | ||*/| | ||public| |TableDescriptor||build() {| | ||return| |descriptor;| | ||}| |}| 2) I'm also not the biggest fun of how the LikeOptions are suggested in the doc. Can't we have something more like |class LikeOption {| | public enum MergingStrategy { INCLUDING, EXCLUDING, OVERWRITING } | | public enum FeatureOption { ALL, CONSTRAINTS, GENERATED, OPTIONS, PARTITIONS, WATERMARKS } private final MergingStrategy mergingStrategy; private final FeatureOption featureOption;| | | | public static final LikeOption including(FeatureOption option) {| | return new LikeOption(MergingStrategy.INCLUDING, option); | | }| | public static final LikeOption overwriting(FeatureOption option) {| | Preconditions.checkArgument(option != ALL && ...); | | return new LikeOption(MergingStrategy.INCLUDING, option); | | }| || |}| 3) TableEnvironment#from(descriptor) will register descriptor under a system generated table path (just like TableImpl#toString) first, and scan from the table path to derive the Table. Table#executeInsert() does it in the similar way. I would try not to register the table under a generated table path. Do we really need that? I am pretty sure we can use the tables without registering them in a catalog. Similarly to the old TableSourceQueryOperation. Otherwise looks good|| |Best,| |Dawid | | | On 23/07/2020 10:35, Timo Walther wrote: > Hi Jark, > > thanks for the update. I think the FLIP is in a really good shape now > and ready to be voted. If others have no further comments? > > I have one last comment around the methods of the descriptor builders. > When refactoring classes such as `KafkaConnector` or > `ElasticsearchConnector`. We should align the method names with the > new property names introduced in FLIP-122: > > KafkaConnector.newBuilder() > // similar to scan.startup.mode=earliest-offset > .scanStartupModeEarliest() > // similar to sink.partitioner=round-robin > .sinkPartitionerRoundRobin() > > What do you think? > > Thanks for driving this, > Timo > > > On 22.07.20 17:26, Jark Wu wrote: >> Hi all, >> >> After some offline discussion with other people, I'm also fine with >> using >> the builder pattern now, >> even though I still think the `.build()` method is a little verbose >> in the >> user code. >> >> I have updated the FLIP with following changes: >> >> 1) use builder pattern instead of "new" keyword. In order to avoid >> duplicate code and reduce development burden for connector developers, >> I introduced abstract classes `TableDescriptorBuilder` and >> `FormatDescriptorBuilder`. >> All the common methods are pre-defined in the base builder >> class, all >> the custom descriptor builder should extend from the base builder >> classes. >> And we can add more methods into the base builder class in the >> future >> without changes in the connectors. >> 2) use Expression instead of SQL expression string for computed >> column and >> watermark strategy >> 3) use `watermark(rowtime, expr)` as the watermark method. >> 4) `Schema.column()` accepts `AbstractDataType` instead of `DataType` >> 5) drop Schema#proctime and >> Schema#watermarkFor#boundedOutOfOrderTimestamps >> >> A full example will look like this: >> >> tEnv.createTemporaryTable( >> "MyTable", >> KafkaConnector.newBuilder() >> .version("0.11") >> .topic("user_logs") >> .property("bootstrap.servers", "localhost:9092") >> .property("group.id", "test-group") >> .startFromEarliest() >> .sinkPartitionerRoundRobin() >> >> .format(JsonFormat.newBuilder().ignoreParseErrors(false).build()) >> .schema( >> Schema.newBuilder() >> .column("user_id", DataTypes.BIGINT()) >> .column("user_name", DataTypes.STRING()) >> .column("score", DataTypes.DECIMAL(10, 2)) >> .column("log_ts", DataTypes.STRING()) >> .column("part_field_0", DataTypes.STRING()) >> .column("part_field_1", DataTypes.INT()) >> .column("proc", proctime()) // define a processing-time >> attribute with column name "proc" >> .column("ts", toTimestamp($("log_ts"))) >> .watermark("ts", $("ts").minus(lit(3).seconds())) >> .primaryKey("user_id") >> .build()) >> .partitionedBy("part_field_0", "part_field_1") // Kafka >> doesn't >> support partitioned table yet, this is just an example for the API >> .build() >> ); >> >> I hope this resolves all your concerns. Welcome for further feedback! >> >> Updated FLIP: >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder >> >> >> POC: >> https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3 >> >> >> Best, >> Jark >> >> On Thu, 16 Jul 2020 at 20:18, Jark Wu <imj...@gmail.com> wrote: >> >>> Thank you all for the discussion! >>> >>> Here are my comments: >>> >>> 2) I agree we should support Expression as a computed column. But >>> I'm in >>> favor of Leonard's point that maybe we can also support SQL string >>> expression as a computed column. >>> Because it also keeps aligned with DDL. The concern for Expression >>> is that >>> converting Expression to SQL string, or (de)serializing Expression is >>> another topic not clear and may involve lots of work. >>> Maybe we can support Expression later if time permits. >>> >>> 6,7) I still prefer the "new" keyword over builder. I don't think >>> immutable is a strong reason. I care more about usability and >>> experience >>> from users and devs perspective. >>> - Users need to type more words if using builder: >>> `KafkaConnector.newBuilder()...build()` vs `new KafkaConnector()...` >>> - It's more difficult for developers to write a descriptor. 2 >>> classes >>> (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders, >>> schema, partitionedBy, like, etc..). >>> With the "new" keyword all the common methods are defined by the >>> framework. >>> - It's hard to have the same API style for different connectors, >>> because >>> the common methods are defined by users. For example, some may have >>> `withSchema`, `partitionKey`, `withLike`, etc... >>> >>> 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on >>> `JsonFormat`, >>> but the generic `Connector#option`. This doesn't work when using format >>> options. >>> >>> new Connector("kafka") >>> .option(JsonOptions.IGNORE_PARSE_ERRORS, true); // this is wrong, >>> because "kafka" requires "json.ignore-parse-errors" as the option >>> key, not >>> the "ignore-parse-errors". >>> >>> >>> ======================================== >>> Hi Timo, regarding having a complete new stack, I have thought about >>> that. >>> But I still prefer to refactor the existing stack. Reasons: >>> Because I think it will be more confusing if users will see two similar >>> stacks and may have many problems if using the wrong class. >>> For example, we may have two `Schema` and `TableDescriptor` classes. >>> The >>> `KafkaConnector` can't be used in legacy `connect()` API, >>> the legacy `Kafka` class can't be used in the new >>> `createTemporaryTable()` >>> API. >>> Besides, the existing API has been deprecated in 1.11, I think it's >>> fine >>> to remove them in 1.12. >>> >>> >>> Best, >>> Jark >>> >>> >>> On Thu, 16 Jul 2020 at 15:26, Jingsong Li <jingsongl...@gmail.com> >>> wrote: >>> >>>> Thanks for the discussion. >>>> >>>> Descriptor lacks the watermark and the computed column is too long. >>>> >>>> 1) +1 for just `column(...)` >>>> >>>> 2) +1 for being consistent with Table API, the Java Table API >>>> should be >>>> Expression DSL. We don't need pure string support, users should >>>> just use >>>> DDL instead. I think this is just a schema descriptor? The schema >>>> descriptor should be consistent with DDL, so, definitely, it should >>>> contain computed columns information. >>>> >>>> 3) +1 for not containing Schema#proctime and >>>> Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave >>>> them in >>>> legacy apis. >>>> >>>> 6,7) +1 for removing "new" and builder and making it immutable, For >>>> Jark, >>>> the starting method is the static method, the others are not. >>>> >>>> 8) +1 for `ConfigOption`, this can be consistent with >>>> `WritableConfig`. >>>> For Leonard, I don't think user needs “json.fail-on-missing-field” >>>> rather >>>> than “fail-on-missing-field”, user should >>>> need “fail-on-missing-field” rather than >>>> “json.fail-on-missing-field", the >>>> recommended way is "JsonFormat.newInstance().option(....)", should >>>> configure options in the format scope. >>>> >>>> Best, >>>> Jingsong >>>> >>>> On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <xbjt...@gmail.com> wrote: >>>> >>>>> Thanks Jark bring this discussion and organize the FLIP document. >>>>> >>>>> Thanks Dawid and Timo for the feedback. Here are my thoughts. >>>>> >>>>> 1) I’m +1 with using column() for both cases. >>>>> >>>>> 2) Expression DSL vs pure SQL string for computed columns >>>>> >>>>> I think we can support them both and implement the pure SQL String >>>>> first, >>>>> I agree that Expression DSL brings more possibility and >>>>> flexibility, but >>>>> using SQL string is a more unified way which can reuse most logic >>>>> with DDL >>>>> like validation and persist in Catalog, >>>>> and Converting Expression DSL to SQL Expression is another big >>>>> topic and >>>>> I did not figure out a feasible idea until now. >>>>> So, maybe we can postpone the Expression DSL support considered the >>>>> reality. >>>>> >>>>> 3) Methods Schema#proctime and >>>>> Schema#watermarkFor#boundedOutOfOrderTimestamps >>>>> >>>>> +1 with Dawid’s proposal to offer SQL like methods. >>>>> Schema() >>>>> .column("proctime", proctime()); >>>>> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) >>>>> And we can simplify watermarkFor(“colName”, Expression >>>>> watermarkStrategy)to watermark(“colName”, Expression >>>>> watermarkStrategy), I >>>>> think the later one has can express the meaning of “ WATERMARK FOR >>>>> column_name AS watermark_strategy_expression“ well. >>>>> >>>>> 5)6)7) The new keyword vs the static method vs builder pattern >>>>> >>>>> I have not strong tendency, the new keyword and the static method on >>>>> descriptor can nearly treated as a builder and do same things like >>>>> builder. >>>>> For the builder pattern, we will introduce six >>>>> methods(connector.Builder()、connector.Builder.build(), >>>>> format.Builder(), >>>>> format.Builder.build(), Schema.Builder(),Schema.Builder.build() >>>>> ),I think >>>>> we could reduce these unnecessary methods. I ‘m slightly +1 for new >>>>> keyword if we need a choice. >>>>> >>>>> 8) `Connector.option(...)` class should also accept `ConfigOption` >>>>> I’m slightly -1 for this, ConfigOption may not work because the >>>>> key for >>>>> format configOption has not format prefix eg: >>>>> FAIL_ON_MISSING_FIELD of >>>>> json, we need “json.fail-on-missing-field” rather than >>>>> “fail-on-missing-field”. >>>>> >>>>> public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = >>>>> ConfigOptions >>>>> .key("fail-on-missing-field") >>>>> .booleanType() >>>>> .defaultValue(false) >>>>> >>>>> WDYT? >>>>> >>>>> Best, >>>>> Leonard Xu >>>>> >>>>> >>>>>> 在 2020年7月15日,16:37,Timo Walther <twal...@apache.org> 写道: >>>>>> >>>>>> Hi Jark, >>>>>> >>>>>> thanks for working on this issue. It is time to fix this last >>>>>> part of >>>>> inconsistency in the API. I also like the core parts of the FLIP, >>>>> esp. that >>>>> TableDescriptor is one entity that can be passed to different >>>>> methods. Here >>>>> is some feedback from my side: >>>>>> >>>>>> 1) +1 for just `column(...)` >>>>>> >>>>>> 2) Expression DSL vs pure SQL string for computed columns >>>>>> I agree with Dawid. Using the Expression DSL is desireable for a >>>>> consistent API. Furthermore, otherwise people need to register >>>>> functions if >>>>> they want to use them in an expression. Refactoring TableSchema is >>>>> definitely on the list for 1.12. Maybe we can come up with some >>>>> intermediate solution where we transform the expression to a SQL >>>>> expression >>>>> for the catalog. Until the discussions around FLIP-80 and >>>>> CatalogTableSchema have been finalized. >>>>>> >>>>>> 3) Schema#proctime and >>>>>> Schema#watermarkFor#boundedOutOfOrderTimestamps >>>>>> We should design the descriptor very close to the SQL syntax. The >>>>>> more >>>>> similar the syntax the more likely it is too keep the new >>>>> descriptor API >>>>> stable. >>>>>> >>>>>> 6) static method vs new keyword >>>>>> Actually, the `new` keyword was one of the things that bothered me >>>>> most in the old design. Fluent APIs avoid this nowadays. >>>>>> >>>>>> 7) make the descriptors immutable with builders >>>>>> The descriptors are some kind of builders already. But they are not >>>>> called "builder". Instead of coming up with the new concept of a >>>>> "descriptor", we should use terminology that people esp. >>>>> Java/Scala users >>>>> are familiar with already. >>>>>> >>>>>> We could make the descriptors immutable to pass them around easily. >>>>>> >>>>>> Btw "Connector" and "Format" should always be in the classname. This >>>>> was also a mistake in the past. Instead of calling the descriptor >>>>> just >>>>> `Kafka` we could call it `KafkaConnector`. An entire example could >>>>> look >>>>> like: >>>>>> >>>>>> tEnv.createTemporaryTable( >>>>>> "OrdersInKafka", >>>>>> KafkaConnector.newBuilder() // builder pattern supported by IDE >>>>>> .topic("user_logs") >>>>>> .property("bootstrap.servers", "localhost:9092") >>>>>> .property("group.id", "test-group") >>>>>> .format(JsonFormat.newInstance()) // shortcut for no >>>>>> parameters >>>>>> .schema( >>>>>> Schema.newBuilder() >>>>>> .column("user_id", DataTypes.BIGINT()) >>>>>> .column("score", DataTypes.DECIMAL(10, 2)) >>>>>> .column("log_ts", DataTypes.TIMESTAMP(3)) >>>>>> .column("my_ts", toTimestamp($("log_ts")) >>>>>> .build() >>>>>> ) >>>>>> .build() >>>>>> ); >>>>>> >>>>>> Instead of refacoring the existing classes, we could also think >>>>>> about >>>>> a completly new stack. I think this would avoid confusion for the old >>>>> users. We could deprecate the entire `Kafka` class instead of >>>>> dealing with >>>>> backwards compatibility. >>>>>> >>>>>> 8) minor extensions >>>>>> A general `Connector.option(...)` class should also accept >>>>> `ConfigOption` instead of only strings. >>>>>> A `Schema.column()` should accept `AbstractDataType` that can be >>>>> resolved to a `DataType` by access to a `DataTypeFactory`. >>>>>> >>>>>> What do you think? >>>>>> >>>>>> Thanks, >>>>>> Timo >>>>>> >>>>>> >>>>>> On 09.07.20 18:51, Jark Wu wrote: >>>>>>> Hi Dawid, >>>>>>> Thanks for the great feedback! Here are my responses: >>>>>>> 1) computedColumn(..) vs column(..) >>>>>>> I'm fine to use `column(..)` in both cases. >>>>>>> 2) Expression DSL vs pure SQL string for computed columns >>>>>>> This is a good point. Actually, I also prefer to use Expression DSL >>>>> because >>>>>>> this is more Table API style. >>>>>>> However, this requires to modify TableSchema again to accept & >>>>>>> expose >>>>>>> Expression as computed columns. >>>>>>> I'm not convinced about this, because AFAIK, we want to have a >>>>>>> CatalogTableSchema to hold this information >>>>>>> and don't want to extend TableSchema. Maybe Timo can give some >>>>>>> points >>>>> here. >>>>>>> Besides, this will make the descriptor API can't be persisted in >>>>> Catalog >>>>>>> unless FLIP-80 is done. >>>>>>> 3) Schema#proctime and >>>>>>> Schema#watermarkFor#boundedOutOfOrderTimestamps >>>>>>> The original intention behind these APIs are providing shortcut >>>>>>> APIs >>>>> for >>>>>>> Table API users. >>>>>>> But I'm also fine to only provide the DDL-like methods if you have >>>>>>> concerns. We can discuss shortcuts in the future if users request. >>>>>>> 4) LikeOption >>>>>>> LikeOption.INCLUDING.ALL is a constant (enum values). I have added >>>>> more >>>>>>> description about this in the FLIP. >>>>>>> 5) implementation? >>>>>>> I don't want to mention too much about implementation details in >>>>>>> the >>>>> FLIP >>>>>>> at the beginning, because the API is already very long. >>>>>>> But I also added an "Implementation" section to explain them. >>>>>>> 6) static method vs new keyword >>>>>>> Personally I prefer the new keyword because it makes the API >>>>>>> cleaner. >>>>> If we >>>>>>> want remove new keyword and use static methods, we have to: >>>>>>> Either adding a `Schema.builder()/create()` method as the starting >>>>> method, >>>>>>> Or duplicating all the methods as static methods, e.g. we have 12 >>>>> methods >>>>>>> in `Kafka`, any of them can be a starting method, then we will >>>>>>> have 24 >>>>>>> methods in `Kafka`. >>>>>>> Both are not good, and it's hard to keep all the descriptors having >>>>> the >>>>>>> same starting method name, but all the descriptors can start >>>>>>> from the >>>>> same >>>>>>> new keyword. >>>>>>> Best, >>>>>>> Jark >>>>>>> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz >>>>>>> <dwysakow...@apache.org >>>>>> >>>>>>> wrote: >>>>>>>> Correction to my point 4. The example is correct. I did not >>>>>>>> read it >>>>>>>> carefully enough. Sorry for the confusion. Nevertheless I'd still >>>>> like >>>>>>>> to see a bit more explanation on the LikeOptions. >>>>>>>> >>>>>>>> On 07/07/2020 04:32, Jark Wu wrote: >>>>>>>>> Hi everyone, >>>>>>>>> >>>>>>>>> Leonard and I prepared a FLIP about refactoring current >>>>>>>>> Descriptor >>>>> API, >>>>>>>>> i.e. TableEnvironment#connect(). We would like to propose a new >>>>>>>> descriptor >>>>>>>>> API to register connectors in Table API. >>>>>>>>> >>>>>>>>> Since Flink 1.9, the community focused more on the new SQL DDL >>>>> feature. >>>>>>>>> After a series of releases, the SQL DDL is powerful and has many >>>>> rich >>>>>>>>> features now. However, Descriptor API (the >>>>> `TableEnvironment#connect()`) >>>>>>>>> has been stagnant for a long time and missing lots of core >>>>> features, such >>>>>>>>> as computed columns and primary keys. That's frustrating for >>>>>>>>> Table >>>>> API >>>>>>>>> users who want to register tables programmatically. Besides, >>>>> currently, a >>>>>>>>> connector must implement a corresponding Descriptor (e.g. `new >>>>> Kafka()`) >>>>>>>>> before using the "connect" API. Therefore, we hope to reduce this >>>>> effort >>>>>>>>> for connector developers, that custom source/sinks can be >>>>> registered via >>>>>>>>> the descriptor API without implementing a Descriptor. >>>>>>>>> >>>>>>>>> These are the problems we want to resolve in this FLIP. I'm >>>>>>>>> looking >>>>>>>> forward >>>>>>>>> to your comments. >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API >>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jark >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>> >>>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature