Thanks Jark for the update. The latest FLIP looks well.
I like Dawid’s proposal of TableDescriptor. Best Leonard Xu > 在 2020年7月23日,22:56,Jark Wu <imj...@gmail.com> 写道: > > Hi Timo, > > That's a good point I missed in the design. I have updated the FLIP and added > a note under the `KafkaConnector` to mention this. > I will not list all the method names in the FLIP as the design doc is super > long now. > > ================================================================ > Hi Dawid, > > 1) KafkaConnector not extends TableDescriptor > The reason why KafkaConnector extends TableDescriptor is that, a builder > pattern "KafkaConnector.newBuilder()...build()" should return > "KafkaConnector" in theory. > So users can write something like the following code which might be more > intuitive. > > KafkaConnector kafka = KafkaConnector.newBuilder()...build(); > tEnv.createTemporaryTable("MyTable", kafka); > > But I agree connector implementation will be simpler if this is not strongly > needed, e.g. we don't need the generic type for descriptor, > we don't need to pass the descriptor class in the builder. So I'm also fine > to not extend it if others don't against it. What's your opinion here @Timo > Walther <mailto:twal...@apache.org> ? > > 2) LikeOptions > I am not very satisfied with the new design. Because the API is not very > fluent. Users will be interrupted to consider what the `overwrite()` > parameter to be. > And the API design doesn't protect users from using the wrong options before > running the code. > What about to list all possible options in one level? This will be more > aligned with SQL DDL and easy to understand and use for users. > > public enum LikeOption { > INCLUDING_ALL, > INCLUDING_CONSTRAINTS, > INCLUDING_GENERATED, > INCLUDING_OPTIONS, > INCLUDING_PARTITIONS, > INCLUDING_WATERMARKS, > > EXCLUDING_ALL, > EXCLUDING_CONSTRAINTS, > EXCLUDING_GENERATED, > EXCLUDING_OPTIONS, > EXCLUDING_PARTITIONS, > EXCLUDING_WATERMARKS, > > OVERWRITING_GENERATED, > OVERWRITING_OPTIONS > } > > 3) register the table under a generated table path > I'm afraid we have to do that. The generated table path is still needed for > `TableSourceTable#tableIdentifier` which is used to calculate the digest. > This requires that the registered table must have an unique identifier. The > old `TableSourceQueryOperation` will also generate the identifier according > to the hashcode of the TableSource object. However, the generated identifier > "Unregistered_TableSource_1234" is still possible to be in conflict with > the user's table path. Therefore, I prefer to register the generated name in > the (temporary) catalog to throw explicit exceptions, rather than generating > a wrong plan. > > ================================================================ > Hi @Leonard Xu <mailto:xbjt...@gmail.com> and @Jingsong Li > <mailto:jingsongl...@gmail.com> , > > Do you have other concerns on the latest FLIP and the above discussion? > > Best, > Jark > > On Thu, 23 Jul 2020 at 18:26, Dawid Wysakowicz <dwysakow...@apache.org > <mailto:dwysakow...@apache.org>> wrote: > Hi Jark, > > Thanks for the update. I think the FLIP looks really well on the high level. > > I have a few comments to the code structure in the FLIP: > > 1) I really don't like how the TableDescriptor exposes protected fields. > Moreover why do we need to extend from it? I don't think we need > KafkaConnector extends TableDescriptor and alike. We only need the builders > e.g. the KafkaConnectorBuilder. > > If I understand it correctly this is the interface needed from the > TableEnvironment perspective and it is the contract that the TableEnvironment > expects. I would suggest making it an interface: > > @PublicEvolving > public interface TableDescriptor { > List<String> getPartitionedFields(); > Schema getSchema(); > Map<String, String> getOptions(); > LikeOption[] getLikeOptions(); > String getLikePath(); > } > > Then the TableDescriptorBuilder would work with an internal implementation of > this interface > > @PublicEvolving > public abstract class TableDescriptorBuilder<BUILDER extends > TableDescriptorBuilder<BUILDER>> { > > private final InternalTableDescriptor descriptor = new > InternalTableDescriptor(); > > /** > * Returns the this builder instance in the type of subclass. > */ > protected abstract BUILDER self(); > > /** > * Specifies the table schema. > */ > public BUILDER schema(Schema schema) { > descriptor.schema = schema; > return self(); > } > > /** > * Specifies the partition keys of this table. > */ > public BUILDER partitionedBy(String... fieldNames) { > checkArgument(descriptor.partitionedFields.isEmpty(), > "partitionedBy(...) shouldn't be called more than once."); > descriptor.partitionedFields.addAll(Arrays.asList(fieldNames)); > return self(); > } > > /** > * Extends some parts from the original registered table path. > */ > public BUILDER like(String tablePath, LikeOption... likeOptions) { > descriptor.likePath = tablePath; > descriptor.likeOptions = likeOptions; > return self(); > } > > protected BUILDER option(String key, String value) { > descriptor.options.put(key, value); > return self(); > } > > /** > * Returns created table descriptor. > */ > public TableDescriptor build() { > return descriptor; > } > } > > > 2) I'm also not the biggest fun of how the LikeOptions are suggested in the > doc. Can't we have something more like > > class LikeOption { > > public enum MergingStrategy { > INCLUDING, > EXCLUDING, > OVERWRITING > } > > public enum FeatureOption { > ALL, > CONSTRAINTS, > GENERATED, > OPTIONS, > PARTITIONS, > WATERMARKS > } > > private final MergingStrategy mergingStrategy; > private final FeatureOption featureOption; > > > > public static final LikeOption including(FeatureOption option) { > > return new LikeOption(MergingStrategy.INCLUDING, option); > > } > > public static final LikeOption overwriting(FeatureOption option) { > > Preconditions.checkArgument(option != ALL && ...); > > return new LikeOption(MergingStrategy.INCLUDING, option); > > } > > } > > > > 3) TableEnvironment#from(descriptor) will register descriptor under a system > generated table path (just like TableImpl#toString) first, and scan from the > table path to derive the Table. Table#executeInsert() does it in the similar > way. > > I would try not to register the table under a generated table path. Do we > really need that? I am pretty sure we can use the tables without registering > them in a catalog. Similarly to the old TableSourceQueryOperation. > > Otherwise looks good > > Best, > > Dawid > > > On 23/07/2020 10:35, Timo Walther wrote: >> Hi Jark, >> >> thanks for the update. I think the FLIP is in a really good shape now and >> ready to be voted. If others have no further comments? >> >> I have one last comment around the methods of the descriptor builders. When >> refactoring classes such as `KafkaConnector` or `ElasticsearchConnector`. We >> should align the method names with the new property names introduced in >> FLIP-122: >> >> KafkaConnector.newBuilder() >> // similar to scan.startup.mode=earliest-offset >> .scanStartupModeEarliest() >> // similar to sink.partitioner=round-robin >> .sinkPartitionerRoundRobin() >> >> What do you think? >> >> Thanks for driving this, >> Timo >> >> >> On 22.07.20 17:26, Jark Wu wrote: >>> Hi all, >>> >>> After some offline discussion with other people, I'm also fine with using >>> the builder pattern now, >>> even though I still think the `.build()` method is a little verbose in >>> the >>> user code. >>> >>> I have updated the FLIP with following changes: >>> >>> 1) use builder pattern instead of "new" keyword. In order to avoid >>> duplicate code and reduce development burden for connector developers, >>> I introduced abstract classes `TableDescriptorBuilder` and >>> `FormatDescriptorBuilder`. >>> All the common methods are pre-defined in the base builder class, all >>> the custom descriptor builder should extend from the base builder classes. >>> And we can add more methods into the base builder class in the future >>> without changes in the connectors. >>> 2) use Expression instead of SQL expression string for computed column and >>> watermark strategy >>> 3) use `watermark(rowtime, expr)` as the watermark method. >>> 4) `Schema.column()` accepts `AbstractDataType` instead of `DataType` >>> 5) drop Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>> >>> A full example will look like this: >>> >>> tEnv.createTemporaryTable( >>> "MyTable", >>> KafkaConnector.newBuilder() >>> .version("0.11") >>> .topic("user_logs") >>> .property("bootstrap.servers", "localhost:9092") >>> .property("group.id <http://group.id/>", "test-group") >>> .startFromEarliest() >>> .sinkPartitionerRoundRobin() >>> .format(JsonFormat.newBuilder().ignoreParseErrors(false).build()) >>> .schema( >>> Schema.newBuilder() >>> .column("user_id", DataTypes.BIGINT()) >>> .column("user_name", DataTypes.STRING()) >>> .column("score", DataTypes.DECIMAL(10, 2)) >>> .column("log_ts", DataTypes.STRING()) >>> .column("part_field_0", DataTypes.STRING()) >>> .column("part_field_1", DataTypes.INT()) >>> .column("proc", proctime()) // define a processing-time >>> attribute with column name "proc" >>> .column("ts", toTimestamp($("log_ts"))) >>> .watermark("ts", $("ts").minus(lit(3).seconds())) >>> .primaryKey("user_id") >>> .build()) >>> .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't >>> support partitioned table yet, this is just an example for the API >>> .build() >>> ); >>> >>> I hope this resolves all your concerns. Welcome for further feedback! >>> >>> Updated FLIP: >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder >>> >>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder> >>> >>> >>> POC: >>> https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3 >>> >>> <https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3> >>> >>> >>> Best, >>> Jark >>> >>> On Thu, 16 Jul 2020 at 20:18, Jark Wu <imj...@gmail.com> >>> <mailto:imj...@gmail.com> wrote: >>> >>>> Thank you all for the discussion! >>>> >>>> Here are my comments: >>>> >>>> 2) I agree we should support Expression as a computed column. But I'm in >>>> favor of Leonard's point that maybe we can also support SQL string >>>> expression as a computed column. >>>> Because it also keeps aligned with DDL. The concern for Expression is that >>>> converting Expression to SQL string, or (de)serializing Expression is >>>> another topic not clear and may involve lots of work. >>>> Maybe we can support Expression later if time permits. >>>> >>>> 6,7) I still prefer the "new" keyword over builder. I don't think >>>> immutable is a strong reason. I care more about usability and experience >>>> from users and devs perspective. >>>> - Users need to type more words if using builder: >>>> `KafkaConnector.newBuilder()...build()` vs `new KafkaConnector()...` >>>> - It's more difficult for developers to write a descriptor. 2 classes >>>> (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders, >>>> schema, partitionedBy, like, etc..). >>>> With the "new" keyword all the common methods are defined by the >>>> framework. >>>> - It's hard to have the same API style for different connectors, >>>> because >>>> the common methods are defined by users. For example, some may have >>>> `withSchema`, `partitionKey`, `withLike`, etc... >>>> >>>> 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on `JsonFormat`, >>>> but the generic `Connector#option`. This doesn't work when using format >>>> options. >>>> >>>> new Connector("kafka") >>>> .option(JsonOptions.IGNORE_PARSE_ERRORS, true); // this is wrong, >>>> because "kafka" requires "json.ignore-parse-errors" as the option key, not >>>> the "ignore-parse-errors". >>>> >>>> >>>> ======================================== >>>> Hi Timo, regarding having a complete new stack, I have thought about that. >>>> But I still prefer to refactor the existing stack. Reasons: >>>> Because I think it will be more confusing if users will see two similar >>>> stacks and may have many problems if using the wrong class. >>>> For example, we may have two `Schema` and `TableDescriptor` classes. The >>>> `KafkaConnector` can't be used in legacy `connect()` API, >>>> the legacy `Kafka` class can't be used in the new `createTemporaryTable()` >>>> API. >>>> Besides, the existing API has been deprecated in 1.11, I think it's fine >>>> to remove them in 1.12. >>>> >>>> >>>> Best, >>>> Jark >>>> >>>> >>>> On Thu, 16 Jul 2020 at 15:26, Jingsong Li <jingsongl...@gmail.com> >>>> <mailto:jingsongl...@gmail.com> wrote: >>>> >>>>> Thanks for the discussion. >>>>> >>>>> Descriptor lacks the watermark and the computed column is too long. >>>>> >>>>> 1) +1 for just `column(...)` >>>>> >>>>> 2) +1 for being consistent with Table API, the Java Table API should be >>>>> Expression DSL. We don't need pure string support, users should just use >>>>> DDL instead. I think this is just a schema descriptor? The schema >>>>> descriptor should be consistent with DDL, so, definitely, it should >>>>> contain computed columns information. >>>>> >>>>> 3) +1 for not containing Schema#proctime and >>>>> Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them >>>>> in >>>>> legacy apis. >>>>> >>>>> 6,7) +1 for removing "new" and builder and making it immutable, For Jark, >>>>> the starting method is the static method, the others are not. >>>>> >>>>> 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. >>>>> For Leonard, I don't think user needs “json.fail-on-missing-field” rather >>>>> than “fail-on-missing-field”, user should >>>>> need “fail-on-missing-field” rather than “json.fail-on-missing-field", >>>>> the >>>>> recommended way is "JsonFormat.newInstance().option(....)", should >>>>> configure options in the format scope. >>>>> >>>>> Best, >>>>> Jingsong >>>>> >>>>> On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <xbjt...@gmail.com> >>>>> <mailto:xbjt...@gmail.com> wrote: >>>>> >>>>>> Thanks Jark bring this discussion and organize the FLIP document. >>>>>> >>>>>> Thanks Dawid and Timo for the feedback. Here are my thoughts. >>>>>> >>>>>> 1) I’m +1 with using column() for both cases. >>>>>> >>>>>> 2) Expression DSL vs pure SQL string for computed columns >>>>>> >>>>>> I think we can support them both and implement the pure SQL String >>>>>> first, >>>>>> I agree that Expression DSL brings more possibility and flexibility, but >>>>>> using SQL string is a more unified way which can reuse most logic with >>>>>> DDL >>>>>> like validation and persist in Catalog, >>>>>> and Converting Expression DSL to SQL Expression is another big topic and >>>>>> I did not figure out a feasible idea until now. >>>>>> So, maybe we can postpone the Expression DSL support considered the >>>>>> reality. >>>>>> >>>>>> 3) Methods Schema#proctime and >>>>>> Schema#watermarkFor#boundedOutOfOrderTimestamps >>>>>> >>>>>> +1 with Dawid’s proposal to offer SQL like methods. >>>>>> Schema() >>>>>> .column("proctime", proctime()); >>>>>> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) >>>>>> And we can simplify watermarkFor(“colName”, Expression >>>>>> watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), >>>>>> I >>>>>> think the later one has can express the meaning of “ WATERMARK FOR >>>>>> column_name AS watermark_strategy_expression“ well. >>>>>> >>>>>> 5)6)7) The new keyword vs the static method vs builder pattern >>>>>> >>>>>> I have not strong tendency, the new keyword and the static method on >>>>>> descriptor can nearly treated as a builder and do same things like >>>>>> builder. >>>>>> For the builder pattern, we will introduce six >>>>>> methods(connector.Builder()、connector.Builder.build(), format.Builder(), >>>>>> format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I >>>>>> think >>>>>> we could reduce these unnecessary methods. I ‘m slightly +1 for new >>>>>> keyword if we need a choice. >>>>>> >>>>>> 8) `Connector.option(...)` class should also accept `ConfigOption` >>>>>> I’m slightly -1 for this, ConfigOption may not work because the key for >>>>>> format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of >>>>>> json, we need “json.fail-on-missing-field” rather than >>>>>> “fail-on-missing-field”. >>>>>> >>>>>> public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = >>>>>> ConfigOptions >>>>>> .key("fail-on-missing-field") >>>>>> .booleanType() >>>>>> .defaultValue(false) >>>>>> >>>>>> WDYT? >>>>>> >>>>>> Best, >>>>>> Leonard Xu >>>>>> >>>>>> >>>>>>> 在 2020年7月15日,16:37,Timo Walther <twal...@apache.org> >>>>>>> <mailto:twal...@apache.org> 写道: >>>>>>> >>>>>>> Hi Jark, >>>>>>> >>>>>>> thanks for working on this issue. It is time to fix this last part of >>>>>> inconsistency in the API. I also like the core parts of the FLIP, esp. >>>>>> that >>>>>> TableDescriptor is one entity that can be passed to different methods. >>>>>> Here >>>>>> is some feedback from my side: >>>>>>> >>>>>>> 1) +1 for just `column(...)` >>>>>>> >>>>>>> 2) Expression DSL vs pure SQL string for computed columns >>>>>>> I agree with Dawid. Using the Expression DSL is desireable for a >>>>>> consistent API. Furthermore, otherwise people need to register functions >>>>>> if >>>>>> they want to use them in an expression. Refactoring TableSchema is >>>>>> definitely on the list for 1.12. Maybe we can come up with some >>>>>> intermediate solution where we transform the expression to a SQL >>>>>> expression >>>>>> for the catalog. Until the discussions around FLIP-80 and >>>>>> CatalogTableSchema have been finalized. >>>>>>> >>>>>>> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>>>>>> We should design the descriptor very close to the SQL syntax. The more >>>>>> similar the syntax the more likely it is too keep the new descriptor API >>>>>> stable. >>>>>>> >>>>>>> 6) static method vs new keyword >>>>>>> Actually, the `new` keyword was one of the things that bothered me >>>>>> most in the old design. Fluent APIs avoid this nowadays. >>>>>>> >>>>>>> 7) make the descriptors immutable with builders >>>>>>> The descriptors are some kind of builders already. But they are not >>>>>> called "builder". Instead of coming up with the new concept of a >>>>>> "descriptor", we should use terminology that people esp. Java/Scala >>>>>> users >>>>>> are familiar with already. >>>>>>> >>>>>>> We could make the descriptors immutable to pass them around easily. >>>>>>> >>>>>>> Btw "Connector" and "Format" should always be in the classname. This >>>>>> was also a mistake in the past. Instead of calling the descriptor just >>>>>> `Kafka` we could call it `KafkaConnector`. An entire example could look >>>>>> like: >>>>>>> >>>>>>> tEnv.createTemporaryTable( >>>>>>> "OrdersInKafka", >>>>>>> KafkaConnector.newBuilder() // builder pattern supported by IDE >>>>>>> .topic("user_logs") >>>>>>> .property("bootstrap.servers", "localhost:9092") >>>>>>> .property("group.id <http://group.id/>", "test-group") >>>>>>> .format(JsonFormat.newInstance()) // shortcut for no parameters >>>>>>> .schema( >>>>>>> Schema.newBuilder() >>>>>>> .column("user_id", DataTypes.BIGINT()) >>>>>>> .column("score", DataTypes.DECIMAL(10, 2)) >>>>>>> .column("log_ts", DataTypes.TIMESTAMP(3)) >>>>>>> .column("my_ts", toTimestamp($("log_ts")) >>>>>>> .build() >>>>>>> ) >>>>>>> .build() >>>>>>> ); >>>>>>> >>>>>>> Instead of refacoring the existing classes, we could also think about >>>>>> a completly new stack. I think this would avoid confusion for the old >>>>>> users. We could deprecate the entire `Kafka` class instead of dealing >>>>>> with >>>>>> backwards compatibility. >>>>>>> >>>>>>> 8) minor extensions >>>>>>> A general `Connector.option(...)` class should also accept >>>>>> `ConfigOption` instead of only strings. >>>>>>> A `Schema.column()` should accept `AbstractDataType` that can be >>>>>> resolved to a `DataType` by access to a `DataTypeFactory`. >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Thanks, >>>>>>> Timo >>>>>>> >>>>>>> >>>>>>> On 09.07.20 18:51, Jark Wu wrote: >>>>>>>> Hi Dawid, >>>>>>>> Thanks for the great feedback! Here are my responses: >>>>>>>> 1) computedColumn(..) vs column(..) >>>>>>>> I'm fine to use `column(..)` in both cases. >>>>>>>> 2) Expression DSL vs pure SQL string for computed columns >>>>>>>> This is a good point. Actually, I also prefer to use Expression DSL >>>>>> because >>>>>>>> this is more Table API style. >>>>>>>> However, this requires to modify TableSchema again to accept & expose >>>>>>>> Expression as computed columns. >>>>>>>> I'm not convinced about this, because AFAIK, we want to have a >>>>>>>> CatalogTableSchema to hold this information >>>>>>>> and don't want to extend TableSchema. Maybe Timo can give some points >>>>>> here. >>>>>>>> Besides, this will make the descriptor API can't be persisted in >>>>>> Catalog >>>>>>>> unless FLIP-80 is done. >>>>>>>> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>>>>>>> The original intention behind these APIs are providing shortcut APIs >>>>>> for >>>>>>>> Table API users. >>>>>>>> But I'm also fine to only provide the DDL-like methods if you have >>>>>>>> concerns. We can discuss shortcuts in the future if users request. >>>>>>>> 4) LikeOption >>>>>>>> LikeOption.INCLUDING.ALL is a constant (enum values). I have added >>>>>> more >>>>>>>> description about this in the FLIP. >>>>>>>> 5) implementation? >>>>>>>> I don't want to mention too much about implementation details in the >>>>>> FLIP >>>>>>>> at the beginning, because the API is already very long. >>>>>>>> But I also added an "Implementation" section to explain them. >>>>>>>> 6) static method vs new keyword >>>>>>>> Personally I prefer the new keyword because it makes the API cleaner. >>>>>> If we >>>>>>>> want remove new keyword and use static methods, we have to: >>>>>>>> Either adding a `Schema.builder()/create()` method as the starting >>>>>> method, >>>>>>>> Or duplicating all the methods as static methods, e.g. we have 12 >>>>>> methods >>>>>>>> in `Kafka`, any of them can be a starting method, then we will have 24 >>>>>>>> methods in `Kafka`. >>>>>>>> Both are not good, and it's hard to keep all the descriptors having >>>>>> the >>>>>>>> same starting method name, but all the descriptors can start from the >>>>>> same >>>>>>>> new keyword. >>>>>>>> Best, >>>>>>>> Jark >>>>>>>> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <dwysakow...@apache.org >>>>>>>> <mailto:dwysakow...@apache.org> >>>>>>> >>>>>>>> wrote: >>>>>>>>> Correction to my point 4. The example is correct. I did not read it >>>>>>>>> carefully enough. Sorry for the confusion. Nevertheless I'd still >>>>>> like >>>>>>>>> to see a bit more explanation on the LikeOptions. >>>>>>>>> >>>>>>>>> On 07/07/2020 04:32, Jark Wu wrote: >>>>>>>>>> Hi everyone, >>>>>>>>>> >>>>>>>>>> Leonard and I prepared a FLIP about refactoring current Descriptor >>>>>> API, >>>>>>>>>> i.e. TableEnvironment#connect(). We would like to propose a new >>>>>>>>> descriptor >>>>>>>>>> API to register connectors in Table API. >>>>>>>>>> >>>>>>>>>> Since Flink 1.9, the community focused more on the new SQL DDL >>>>>> feature. >>>>>>>>>> After a series of releases, the SQL DDL is powerful and has many >>>>>> rich >>>>>>>>>> features now. However, Descriptor API (the >>>>>> `TableEnvironment#connect()`) >>>>>>>>>> has been stagnant for a long time and missing lots of core >>>>>> features, such >>>>>>>>>> as computed columns and primary keys. That's frustrating for Table >>>>>> API >>>>>>>>>> users who want to register tables programmatically. Besides, >>>>>> currently, a >>>>>>>>>> connector must implement a corresponding Descriptor (e.g. `new >>>>>> Kafka()`) >>>>>>>>>> before using the "connect" API. Therefore, we hope to reduce this >>>>>> effort >>>>>>>>>> for connector developers, that custom source/sinks can be >>>>>> registered via >>>>>>>>>> the descriptor API without implementing a Descriptor. >>>>>>>>>> >>>>>>>>>> These are the problems we want to resolve in this FLIP. I'm looking >>>>>>>>> forward >>>>>>>>>> to your comments. >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API >>>>>> >>>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API> >>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jark >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> >>> >>