Hi Jark,

Thanks for the update. I think the FLIP looks really well on the high level.

I have a few comments to the code structure in the FLIP:

1) I really don't like how the TableDescriptor exposes protected fields.
Moreover why do we need to extend from it? I don't think we need
KafkaConnector extends TableDescriptor and alike. We only need the
builders e.g. the KafkaConnectorBuilder.

If I understand it correctly this is the interface needed from the
TableEnvironment perspective and it is the contract that the
TableEnvironment expects. I would suggest making it an interface:

|@PublicEvolving|
|public| interface |TableDescriptor {|
|    ||List<String> getPartitionedFields();|
      |Schema getSchema();|
|    ||Map<String, String> getOptions();|
|    ||LikeOption[] getLikeOptions();|
|    ||String getLikePath();|
|}|
|
|

Then the TableDescriptorBuilder would work with an internal
implementation of this interface

|@PublicEvolving|
|public| |abstract| |class| |TableDescriptorBuilder<||BUILDER ||extends|
|TableDescriptorBuilder<BUILDER>> {|
 
|    ||private| |final| |InternalTableDescriptor||descriptor = new
InternalTableDescriptor();|
 
|    ||/**|
|     ||* Returns the this builder instance in the type of subclass.|
|     ||*/|
|    ||protected| |abstract| |BUILDER self();|
 
|    ||/**|
|     ||* Specifies the table schema.|
|     ||*/|
|    ||public| |BUILDER schema(Schema schema) {|
|        ||descriptor.schema = schema;|
|        ||return| |self();|
|    ||}|
 
|    ||/**|
|     ||* Specifies the partition keys of this table.|
|     ||*/|
|    ||public| |BUILDER partitionedBy(String... fieldNames) {|
|        ||checkArgument(descriptor.partitionedFields.isEmpty(),
||"partitionedBy(...) shouldn't be called more than once."||);|
|        ||descriptor.partitionedFields.addAll(Arrays.asList(fieldNames));|
|        ||return| |self();|
|    ||}|
 
|    ||/**|
|     ||* Extends some parts from the original registered table path.|
|     ||*/|
|    ||public| |BUILDER like(String tablePath, LikeOption... likeOptions) {|
|        ||descriptor.likePath = tablePath;|
|        ||descriptor.likeOptions = likeOptions;|
|        ||return| |self();|
|    ||}|
 
|    ||protected| |BUILDER option(String key, String value) {|
|        ||descriptor.options.put(key, value);|
|        ||return| |self();|
|    ||}|
 
|    ||/**|
|     ||* Returns created table descriptor.|
|     ||*/|
|    ||public| |TableDescriptor||build() {|
|        ||return| |descriptor;|
|    ||}|
|}|


2) I'm also not the biggest fun of how the LikeOptions are suggested in
the doc. Can't we have something more like

|class LikeOption {|

|    public enum MergingStrategy {
        INCLUDING,
        EXCLUDING,
        OVERWRITING
    }
|

|    public enum FeatureOption {
        ALL,
        CONSTRAINTS,
        GENERATED,
        OPTIONS,
        PARTITIONS,
        WATERMARKS
    }

    private final MergingStrategy mergingStrategy;
    private final FeatureOption featureOption;|

|
|

|    public static final LikeOption including(FeatureOption option) {|

|        return new LikeOption(MergingStrategy.INCLUDING, option);
|

|    }|

|    public static final LikeOption overwriting(FeatureOption option) {|

|        Preconditions.checkArgument(option != ALL && ...);
|

|        return new LikeOption(MergingStrategy.INCLUDING, option);
|

|    }|

||

|}|


3) TableEnvironment#from(descriptor) will register descriptor under a
system generated table path (just like TableImpl#toString) first, and
scan from the table path to derive the Table. Table#executeInsert() does
it in the similar way.

I would try not to register the table under a generated table path. Do
we really need that? I am pretty sure we can use the tables without
registering them in a catalog. Similarly to the old
TableSourceQueryOperation.

Otherwise looks good||

|Best,|

|Dawid
|

| 
|
On 23/07/2020 10:35, Timo Walther wrote:
> Hi Jark,
>
> thanks for the update. I think the FLIP is in a really good shape now
> and ready to be voted. If others have no further comments?
>
> I have one last comment around the methods of the descriptor builders.
> When refactoring classes such as `KafkaConnector` or
> `ElasticsearchConnector`. We should align the method names with the
> new property names introduced in FLIP-122:
>
> KafkaConnector.newBuilder()
>   // similar to scan.startup.mode=earliest-offset
>   .scanStartupModeEarliest()
>   // similar to sink.partitioner=round-robin
>   .sinkPartitionerRoundRobin()
>
> What do you think?
>
> Thanks for driving this,
> Timo
>
>
> On 22.07.20 17:26, Jark Wu wrote:
>> Hi all,
>>
>> After some offline discussion with other people, I'm also fine with
>> using
>> the builder pattern now,
>>   even though I still think the `.build()` method is a little verbose
>> in the
>> user code.
>>
>> I have updated the FLIP with following changes:
>>
>> 1) use builder pattern instead of "new" keyword. In order to avoid
>> duplicate code and reduce development burden for connector developers,
>>       I introduced abstract classes `TableDescriptorBuilder` and
>> `FormatDescriptorBuilder`.
>>      All the common methods are pre-defined in the base builder
>> class, all
>> the custom descriptor builder should extend from the base builder
>> classes.
>>      And we can add more methods into the base builder class in the
>> future
>> without changes in the connectors.
>> 2) use Expression instead of SQL expression string for computed
>> column and
>> watermark strategy
>> 3) use `watermark(rowtime, expr)` as the watermark method.
>> 4) `Schema.column()` accepts `AbstractDataType` instead of `DataType`
>> 5) drop Schema#proctime and
>> Schema#watermarkFor#boundedOutOfOrderTimestamps
>>
>> A full example will look like this:
>>
>> tEnv.createTemporaryTable(
>>      "MyTable",
>>      KafkaConnector.newBuilder()
>>          .version("0.11")
>>          .topic("user_logs")
>>          .property("bootstrap.servers", "localhost:9092")
>>          .property("group.id", "test-group")
>>          .startFromEarliest()
>>          .sinkPartitionerRoundRobin()
>>         
>> .format(JsonFormat.newBuilder().ignoreParseErrors(false).build())
>>          .schema(
>>              Schema.newBuilder()
>>                  .column("user_id", DataTypes.BIGINT())
>>                  .column("user_name", DataTypes.STRING())
>>                  .column("score", DataTypes.DECIMAL(10, 2))
>>                  .column("log_ts", DataTypes.STRING())
>>                  .column("part_field_0", DataTypes.STRING())
>>                  .column("part_field_1", DataTypes.INT())
>>                  .column("proc", proctime()) // define a processing-time
>> attribute with column name "proc"
>>                  .column("ts", toTimestamp($("log_ts")))
>>                  .watermark("ts", $("ts").minus(lit(3).seconds()))
>>                  .primaryKey("user_id")
>>                  .build())
>>          .partitionedBy("part_field_0", "part_field_1")  // Kafka
>> doesn't
>> support partitioned table yet, this is just an example for the API
>>          .build()
>> );
>>
>> I hope this resolves all your concerns. Welcome for further feedback!
>>
>> Updated FLIP:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder
>>
>>
>> POC:
>> https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3
>>
>>
>> Best,
>> Jark
>>
>> On Thu, 16 Jul 2020 at 20:18, Jark Wu <imj...@gmail.com> wrote:
>>
>>> Thank you all for the discussion!
>>>
>>> Here are my comments:
>>>
>>> 2) I agree we should support Expression as a computed column. But
>>> I'm in
>>> favor of Leonard's point that maybe we can also support SQL string
>>> expression as a computed column.
>>> Because it also keeps aligned with DDL. The concern for Expression
>>> is that
>>> converting Expression to SQL string, or (de)serializing Expression is
>>> another topic not clear and may involve lots of work.
>>> Maybe we can support Expression later if time permits.
>>>
>>> 6,7) I still prefer the "new" keyword over builder. I don't think
>>> immutable is a strong reason. I care more about usability and
>>> experience
>>> from users and devs perspective.
>>>    - Users need to type more words if using builder:
>>> `KafkaConnector.newBuilder()...build()`  vs `new KafkaConnector()...`
>>>    - It's more difficult for developers to write a descriptor.  2
>>> classes
>>> (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders,
>>> schema, partitionedBy, like, etc..).
>>>      With the "new" keyword all the common methods are defined by the
>>> framework.
>>>    - It's hard to have the same API style for different connectors,
>>> because
>>> the common methods are defined by users. For example, some may have
>>> `withSchema`, `partitionKey`, `withLike`, etc...
>>>
>>> 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on
>>> `JsonFormat`,
>>> but the generic `Connector#option`. This doesn't work when using format
>>> options.
>>>
>>> new Connector("kafka")
>>>   .option(JsonOptions.IGNORE_PARSE_ERRORS, true);   // this is wrong,
>>> because "kafka" requires "json.ignore-parse-errors" as the option
>>> key, not
>>> the "ignore-parse-errors".
>>>
>>>
>>> ========================================
>>> Hi Timo, regarding having a complete new stack, I have thought about
>>> that.
>>> But I still prefer to refactor the existing stack. Reasons:
>>> Because I think it will be more confusing if users will see two similar
>>> stacks and may have many problems if using the wrong class.
>>> For example, we may have two `Schema` and `TableDescriptor` classes.
>>> The
>>> `KafkaConnector` can't be used in legacy `connect()` API,
>>> the legacy `Kafka` class can't be used in the new
>>> `createTemporaryTable()`
>>> API.
>>> Besides, the existing API has been deprecated in 1.11, I think it's
>>> fine
>>> to remove them in 1.12.
>>>
>>>
>>> Best,
>>> Jark
>>>
>>>
>>> On Thu, 16 Jul 2020 at 15:26, Jingsong Li <jingsongl...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for the discussion.
>>>>
>>>> Descriptor lacks the watermark and the computed column is too long.
>>>>
>>>> 1) +1 for just `column(...)`
>>>>
>>>> 2) +1 for being consistent with Table API, the Java Table API
>>>> should be
>>>> Expression DSL. We don't need pure string support, users should
>>>> just use
>>>> DDL instead. I think this is just a schema descriptor? The schema
>>>> descriptor should be consistent with DDL, so, definitely, it should
>>>> contain computed columns information.
>>>>
>>>> 3) +1 for not containing Schema#proctime and
>>>> Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave
>>>> them in
>>>> legacy apis.
>>>>
>>>> 6,7) +1 for removing "new" and builder and making it immutable, For
>>>> Jark,
>>>> the starting method is the static method, the others are not.
>>>>
>>>> 8) +1 for `ConfigOption`, this can be consistent with
>>>> `WritableConfig`.
>>>> For Leonard, I don't think user needs “json.fail-on-missing-field”
>>>> rather
>>>> than “fail-on-missing-field”, user should
>>>> need “fail-on-missing-field” rather than
>>>> “json.fail-on-missing-field", the
>>>> recommended way is "JsonFormat.newInstance().option(....)", should
>>>> configure options in the format scope.
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>> On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <xbjt...@gmail.com> wrote:
>>>>
>>>>> Thanks Jark bring this discussion and organize the FLIP document.
>>>>>
>>>>> Thanks Dawid and Timo for the feedback. Here are my thoughts.
>>>>>
>>>>> 1)  I’m +1 with using column() for both cases.
>>>>>
>>>>> 2) Expression DSL vs pure SQL string for computed columns
>>>>>
>>>>> I think we can support them both and implement the pure SQL String
>>>>> first,
>>>>> I agree that Expression DSL brings more possibility and
>>>>> flexibility, but
>>>>> using SQL string is a more unified way which can reuse most logic
>>>>> with DDL
>>>>> like validation and persist in Catalog,
>>>>> and Converting Expression DSL to SQL Expression is another big
>>>>> topic and
>>>>> I did not figure out a feasible idea until now.
>>>>> So, maybe we can postpone the Expression DSL support considered the
>>>>> reality.
>>>>>
>>>>> 3) Methods Schema#proctime and
>>>>> Schema#watermarkFor#boundedOutOfOrderTimestamps
>>>>>
>>>>>   +1 with Dawid’s proposal to offer SQL like methods.
>>>>>   Schema()
>>>>>      .column("proctime", proctime());
>>>>>      .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds()))
>>>>> And we can simplify watermarkFor(“colName”, Expression
>>>>> watermarkStrategy)to watermark(“colName”, Expression
>>>>> watermarkStrategy), I
>>>>> think the later one has can express the meaning of “ WATERMARK FOR
>>>>> column_name AS watermark_strategy_expression“ well.
>>>>>
>>>>> 5)6)7) The new keyword vs the static method vs builder pattern
>>>>>
>>>>> I have not strong tendency,  the new keyword and the static method on
>>>>> descriptor can nearly treated as a builder  and do same things like
>>>>> builder.
>>>>> For the builder pattern, we will introduce six
>>>>> methods(connector.Builder()、connector.Builder.build(),
>>>>> format.Builder(),
>>>>> format.Builder.build(), Schema.Builder(),Schema.Builder.build()
>>>>> ),I think
>>>>> we could reduce these unnecessary methods.  I ‘m slightly +1 for new
>>>>> keyword if we need a choice.
>>>>>
>>>>> 8) `Connector.option(...)` class should also accept `ConfigOption`
>>>>> I’m slightly -1 for this, ConfigOption may not work because the
>>>>> key for
>>>>> format configOption has not format prefix eg:
>>>>> FAIL_ON_MISSING_FIELD of
>>>>> json, we need “json.fail-on-missing-field” rather than
>>>>> “fail-on-missing-field”.
>>>>>
>>>>> public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD =
>>>>> ConfigOptions
>>>>>          .key("fail-on-missing-field")
>>>>>          .booleanType()
>>>>>          .defaultValue(false)
>>>>>
>>>>> WDYT?
>>>>>
>>>>> Best,
>>>>> Leonard Xu
>>>>>
>>>>>
>>>>>> 在 2020年7月15日,16:37,Timo Walther <twal...@apache.org> 写道:
>>>>>>
>>>>>> Hi Jark,
>>>>>>
>>>>>> thanks for working on this issue. It is time to fix this last
>>>>>> part of
>>>>> inconsistency in the API. I also like the core parts of the FLIP,
>>>>> esp. that
>>>>> TableDescriptor is one entity that can be passed to different
>>>>> methods. Here
>>>>> is some feedback from my side:
>>>>>>
>>>>>> 1) +1 for just `column(...)`
>>>>>>
>>>>>> 2) Expression DSL vs pure SQL string for computed columns
>>>>>> I agree with Dawid. Using the Expression DSL is desireable for a
>>>>> consistent API. Furthermore, otherwise people need to register
>>>>> functions if
>>>>> they want to use them in an expression. Refactoring TableSchema is
>>>>> definitely on the list for 1.12. Maybe we can come up with some
>>>>> intermediate solution where we transform the expression to a SQL
>>>>> expression
>>>>> for the catalog. Until the discussions around FLIP-80 and
>>>>> CatalogTableSchema have been finalized.
>>>>>>
>>>>>> 3) Schema#proctime and
>>>>>> Schema#watermarkFor#boundedOutOfOrderTimestamps
>>>>>> We should design the descriptor very close to the SQL syntax. The
>>>>>> more
>>>>> similar the syntax the more likely it is too keep the new
>>>>> descriptor API
>>>>> stable.
>>>>>>
>>>>>> 6) static method vs new keyword
>>>>>> Actually, the `new` keyword was one of the things that bothered me
>>>>> most in the old design. Fluent APIs avoid this nowadays.
>>>>>>
>>>>>> 7) make the descriptors immutable with builders
>>>>>> The descriptors are some kind of builders already. But they are not
>>>>> called "builder". Instead of coming up with the new concept of a
>>>>> "descriptor", we should use terminology that people esp.
>>>>> Java/Scala users
>>>>> are familiar with already.
>>>>>>
>>>>>> We could make the descriptors immutable to pass them around easily.
>>>>>>
>>>>>> Btw "Connector" and "Format" should always be in the classname. This
>>>>> was also a mistake in the past. Instead of calling the descriptor
>>>>> just
>>>>> `Kafka` we could call it `KafkaConnector`. An entire example could
>>>>> look
>>>>> like:
>>>>>>
>>>>>> tEnv.createTemporaryTable(
>>>>>>    "OrdersInKafka",
>>>>>>    KafkaConnector.newBuilder() // builder pattern supported by IDE
>>>>>>       .topic("user_logs")
>>>>>>       .property("bootstrap.servers", "localhost:9092")
>>>>>>       .property("group.id", "test-group")
>>>>>>       .format(JsonFormat.newInstance()) // shortcut for no
>>>>>> parameters
>>>>>>       .schema(
>>>>>>          Schema.newBuilder()
>>>>>>             .column("user_id", DataTypes.BIGINT())
>>>>>>             .column("score", DataTypes.DECIMAL(10, 2))
>>>>>>             .column("log_ts", DataTypes.TIMESTAMP(3))
>>>>>>             .column("my_ts", toTimestamp($("log_ts"))
>>>>>>             .build()
>>>>>>       )
>>>>>>       .build()
>>>>>> );
>>>>>>
>>>>>> Instead of refacoring the existing classes, we could also think
>>>>>> about
>>>>> a completly new stack. I think this would avoid confusion for the old
>>>>> users. We could deprecate the entire `Kafka` class instead of
>>>>> dealing with
>>>>> backwards compatibility.
>>>>>>
>>>>>> 8) minor extensions
>>>>>> A general `Connector.option(...)` class should also accept
>>>>> `ConfigOption` instead of only strings.
>>>>>> A `Schema.column()` should accept `AbstractDataType` that can be
>>>>> resolved to a `DataType` by access to a `DataTypeFactory`.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Thanks,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 09.07.20 18:51, Jark Wu wrote:
>>>>>>> Hi Dawid,
>>>>>>> Thanks for the great feedback! Here are my responses:
>>>>>>> 1) computedColumn(..) vs column(..)
>>>>>>> I'm fine to use `column(..)` in both cases.
>>>>>>> 2) Expression DSL vs pure SQL string for computed columns
>>>>>>> This is a good point. Actually, I also prefer to use Expression DSL
>>>>> because
>>>>>>> this is more Table API style.
>>>>>>> However, this requires to modify TableSchema again to accept &
>>>>>>> expose
>>>>>>> Expression as computed columns.
>>>>>>> I'm not convinced about this, because AFAIK, we want to have a
>>>>>>> CatalogTableSchema to hold this information
>>>>>>> and don't want to extend TableSchema. Maybe Timo can give some
>>>>>>> points
>>>>> here.
>>>>>>> Besides, this will make the descriptor API can't be persisted in
>>>>> Catalog
>>>>>>> unless FLIP-80 is done.
>>>>>>> 3) Schema#proctime and
>>>>>>> Schema#watermarkFor#boundedOutOfOrderTimestamps
>>>>>>> The original intention behind these APIs are providing shortcut
>>>>>>> APIs
>>>>> for
>>>>>>> Table API users.
>>>>>>> But I'm also fine to only provide the DDL-like methods if you have
>>>>>>> concerns. We can discuss shortcuts in the future if users request.
>>>>>>> 4) LikeOption
>>>>>>> LikeOption.INCLUDING.ALL is a constant (enum values). I have added
>>>>> more
>>>>>>> description about this in the FLIP.
>>>>>>> 5) implementation?
>>>>>>> I don't want to mention too much about implementation details in
>>>>>>> the
>>>>> FLIP
>>>>>>> at the beginning, because the API is already very long.
>>>>>>> But I also added an "Implementation" section to explain them.
>>>>>>> 6) static method vs new keyword
>>>>>>> Personally I prefer the new keyword because it makes the API
>>>>>>> cleaner.
>>>>> If we
>>>>>>> want remove new keyword and use static methods, we have to:
>>>>>>> Either adding a `Schema.builder()/create()` method as the starting
>>>>> method,
>>>>>>> Or duplicating all the methods as static methods, e.g. we have 12
>>>>> methods
>>>>>>> in `Kafka`, any of them can be a starting method, then we will
>>>>>>> have 24
>>>>>>> methods in `Kafka`.
>>>>>>> Both are not good, and it's hard to keep all the descriptors having
>>>>> the
>>>>>>> same starting method name, but all the descriptors can start
>>>>>>> from the
>>>>> same
>>>>>>> new keyword.
>>>>>>> Best,
>>>>>>> Jark
>>>>>>> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz
>>>>>>> <dwysakow...@apache.org
>>>>>>
>>>>>>> wrote:
>>>>>>>> Correction to my point 4. The example is correct. I did not
>>>>>>>> read it
>>>>>>>> carefully enough. Sorry for the confusion. Nevertheless I'd still
>>>>> like
>>>>>>>> to see a bit more explanation on the LikeOptions.
>>>>>>>>
>>>>>>>> On 07/07/2020 04:32, Jark Wu wrote:
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> Leonard and I prepared a FLIP about refactoring current
>>>>>>>>> Descriptor
>>>>> API,
>>>>>>>>> i.e. TableEnvironment#connect(). We would like to propose a new
>>>>>>>> descriptor
>>>>>>>>> API to register connectors in Table API.
>>>>>>>>>
>>>>>>>>> Since Flink 1.9, the community focused more on the new SQL DDL
>>>>> feature.
>>>>>>>>> After a series of releases, the SQL DDL is powerful and has many
>>>>> rich
>>>>>>>>> features now. However, Descriptor API (the
>>>>> `TableEnvironment#connect()`)
>>>>>>>>> has been stagnant for a long time and missing lots of core
>>>>> features, such
>>>>>>>>> as computed columns and primary keys. That's frustrating for
>>>>>>>>> Table
>>>>> API
>>>>>>>>> users who want to register tables programmatically. Besides,
>>>>> currently, a
>>>>>>>>> connector must implement a corresponding Descriptor (e.g. `new
>>>>> Kafka()`)
>>>>>>>>> before using the "connect" API. Therefore, we hope to reduce this
>>>>> effort
>>>>>>>>> for connector developers, that custom source/sinks can be
>>>>> registered via
>>>>>>>>> the descriptor API without implementing a Descriptor.
>>>>>>>>>
>>>>>>>>> These are the problems we want to resolve in this FLIP. I'm
>>>>>>>>> looking
>>>>>>>> forward
>>>>>>>>> to your comments.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API
>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>> -- 
>>>> Best, Jingsong Lee
>>>>
>>>
>>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to