Hi all,
I'm happy to see a lot of interest in easing the integration with JDBC data
sources. Maybe this could be a rare situation (not in my experience
however..) but what if I have to connect to the same type of source (e.g.
Mysql) with 2 incompatible version...? How can I load the 2 (or more)
connectors jars without causing conflicts?

Il Mar 14 Gen 2020, 23:32 Bowen Li <bowenl...@gmail.com> ha scritto:

> Hi devs,
>
> I've updated the wiki according to feedbacks. Please take another look.
>
> Thanks!
>
>
> On Fri, Jan 10, 2020 at 2:24 PM Bowen Li <bowenl...@gmail.com> wrote:
>
> > Thanks everyone for the prompt feedback. Please see my response below.
> >
> > > In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the
> > java.time.Instant semantic, and should be mapped to Flink's
> TIME/TIMESTAMP
> > WITH LOCAL TIME ZONE
> >
> > Zhenghua, you are right that pg's 'timestamp with timezone' should be
> > translated into flink's 'timestamp with local timezone'. I don't find
> 'time
> > with (local) timezone' though, so we may not support that type from pg in
> > Flink.
> >
> > > I suggest that the parameters can be completely consistent with the
> > JDBCTableSource / JDBCTableSink. If you take a look to JDBC api:
> > "DriverManager.getConnection".
> > That allow "default db, username, pwd" things optional. They can included
> > in URL. Of course JDBC api also allows establishing connections to
> > different databases in a db instance. So I think we don't need provide a
> > "base_url", we can just provide a real "url". To be consistent with JDBC
> > api.
> >
> > Jingsong, what I'm saying is a builder can be added on demand later if
> > there's enough user requesting it, and doesn't need to be a core part of
> > the FLIP.
> >
> > Besides, unfortunately Postgres doesn't allow changing databases via
> JDBC.
> >
> > JDBC provides different connecting options as you mentioned, but I'd like
> > to keep our design and API simple and having to handle extra parsing
> logic.
> > And it doesn't shut the door for what you proposed as a future effort.
> >
> > > Since the PostgreSQL does not have catalog but schema under database,
> > why not mapping the PG-database to Flink catalog and PG-schema to Flink
> > database
> >
> > Danny, because 1) there are frequent use cases where users want to switch
> > databases or referencing objects across databases in a pg instance 2)
> > schema is an optional namespace layer in pg, it always has a default
> value
> > ("public") and can be invisible to users if they'd like to as shown in
> the
> > FLIP 3) as you mentioned it is specific to postgres, and I don't feel
> it's
> > necessary to map Postgres substantially different than others DBMSs with
> > additional complexity
> >
> > >'base_url' configuration: We are following the configuration format
> > guideline [1] which suggest to use dash (-) instead of underline (_). And
> > I'm a little confused the meaning of "base_url" at the first glance,
> > another idea is split it into several configurations: 'driver',
> 'hostname',
> > 'port'.
> >
> > Jark, I agreed we should use "base-url" in yaml config.
> >
> > I'm not sure about having hostname and port separately because you can
> > specify multiple hosts with ports in jdbc, like
> > "jdbc:dbms/host1:port1,host2:port2/", for connection failovers.
> Separating
> > them would make configurations harder.
> >
> > I will add clear doc and example to avoid any possible confusion.
> >
> > > 'default-database' is optional, then which database will be used or
> what
> > is the behavior when the default database is not selected.
> >
> > This should be DBMS specific. For postgres, it will be the <access id>
> > database.
> >
> >
> > On Thu, Jan 9, 2020 at 9:48 PM Zhenghua Gao <doc...@gmail.com> wrote:
> >
> >> Hi Bowen, Thanks for driving this.
> >> I think it would be very convenience to use tables in external DBs with
> >> JDBC Catalog.
> >>
> >> I have one concern about "Flink-Postgres Data Type Mapping" part:
> >>
> >> In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the
> java.time.Instant
> >> semantic,
> >> and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME ZONE
> >>
> >> *Best Regards,*
> >> *Zhenghua Gao*
> >>
> >>
> >> On Fri, Jan 10, 2020 at 11:09 AM Jingsong Li <jingsongl...@gmail.com>
> >> wrote:
> >>
> >> > Hi Bowen, thanks for reply and updating.
> >> >
> >> > > I don't see much value in providing a builder for jdbc catalogs, as
> >> they
> >> > only have 4 or 5 required params, no optional ones. I prefer users
> just
> >> > provide a base url without default db, usrname, pwd so we don't need
> to
> >> > parse url all around, as I mentioned jdbc catalog may need to
> establish
> >> > connections to different databases in a db instance,
> >> >
> >> > I suggest that the parameters can be completely consistent with the
> >> > JDBCTableSource / JDBCTableSink.
> >> > If you take a look to JDBC api: "DriverManager.getConnection".
> >> > That allow "default db, username, pwd" things optional. They can
> >> included
> >> > in URL. Of course JDBC api also allows establishing connections to
> >> > different databases in a db instance.
> >> > So I think we don't need provide a "base_url", we can just provide a
> >> real
> >> > "url".
> >> > To be consistent with JDBC api.
> >> >
> >> > Best,
> >> > Jingsong Lee
> >> >
> >> > On Fri, Jan 10, 2020 at 10:34 AM Jark Wu <imj...@gmail.com> wrote:
> >> >
> >> > > Thanks Bowen for the reply,
> >> > >
> >> > > A user-facing JDBCCatalog and 'catalog.type' = 'jdbc'  sounds good
> to
> >> me.
> >> > >
> >> > > I have some other minor comments when I went through the updated
> >> > > documentation:
> >> > >
> >> > > 1) 'base_url' configuration: We are following the configuration
> format
> >> > > guideline [1] which suggest to use dash (-) instead of underline
> (_).
> >> > >      And I'm a little confused the meaning of "base_url" at the
> first
> >> > > glance, another idea is split it into several configurations:
> >> 'driver',
> >> > > 'hostname', 'port'.
> >> > >
> >> > > 2) 'default-database' is optional, then which database will be used
> or
> >> > what
> >> > > is the behavior when the default database is not selected.
> >> > >
> >> > > 3) a builder for jdbc catalogs: I agree with Jingsong to provide a
> >> > builder.
> >> > > Because there is optional configuration here (the default database),
> >> > >    and providind Builder as the API will be easier for evolution,
> I'm
> >> not
> >> > > sure we won't add/modify parameters in the future.
> >> > >
> >> > > [1]:
> >> > >
> >> > >
> >> >
> >>
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> >> > >
> >> > > On Fri, 10 Jan 2020 at 04:52, Bowen Li <bowenl...@gmail.com> wrote:
> >> > >
> >> > > > Hi Jark and Jingsong,
> >> > > >
> >> > > > Thanks for your review. Please see my reply in line.
> >> > > >
> >> > > > > why introducing a `PostgresJDBCCatalog`, not a generic
> >> `JDBCCatalog`
> >> > > > (catalog.type = 'postgres' vs 'jdbc') ?
> >> > > >
> >> > > > Thanks for the reminding and I looked at JDBCDialect. A generic,
> >> > > > user-facing JDBCCatalog with catalog.type = jdbc and find specific
> >> db
> >> > > > implementations (pg v.s. mysql v.s. ...) is more aligned with how
> >> jdbc
> >> > > > sink/source is handled, indeed. However, the catalogs would also
> >> need
> >> > to
> >> > > > execute the query and parse query results in a db-dependent way.
> >> E.g.
> >> > > jdbc
> >> > > > catalog needs to establish connections to different databases
> >> within a
> >> > db
> >> > > > instance on demand. So just having JDBCDialect won't be enough.
> >> > > >
> >> > > > I think we can do the following:
> >> > > >   - provide a user-facing JDBCCatalog, composing a db-specific
> impl
> >> > like
> >> > > > PostgresJDBCCatalog and MySQLJDBCCatalog. Users still specify
> >> "jdbc" as
> >> > > > type in both Table API and SQL CLI, internally it will create a
> >> > > db-specific
> >> > > > impl depending on jdbc base url.
> >> > > >   - some statements can reside in JDBCDialect. Query execution and
> >> > result
> >> > > > parsing logic would be located in db-specific impls.
> >> > > >
> >> > > > - We can provide a Builder for Catalog, In my opinion,
> >> defaultDatabase,
> >> > > > username, pwd can be included in JDBC DB url.
> >> > > >
> >> > > > I don't see much value in providing a builder for jdbc catalogs,
> as
> >> > they
> >> > > > only have 4 or 5 required params, no optional ones. I prefer users
> >> just
> >> > > > provide a base url without default db, usrname, pwd so we don't
> >> need to
> >> > > > parse url all around, as I mentioned jdbc catalog may need to
> >> establish
> >> > > > connections to different databases in a db instance,
> >> > > >
> >> > > > - About timestamp and time, write down the specific Flink
> precision
> >> of
> >> > > > Postgres?
> >> > > >
> >> > > > I've documented that. It's 0-6
> >> > > >
> >> > > > - I think there is a part missing in your document, that is how to
> >> use
> >> > > this
> >> > > > catalog. If you can write a complete example, I think it will be
> >> much
> >> > > > clearer.
> >> > > >
> >> > > > I added some examples in both table api and SQL Cli. It will be no
> >> > > > different from existing catalogs.
> >> > > >
> >> > > > - So a thing is what TableFactory will this catalog use? For
> >> example,
> >> > > > JDBCTableSourceSinkFactory has different parameters for source or
> >> sink?
> >> > > How
> >> > > > do you think about it?
> >> > > >
> >> > > > This catalog will directly call JDBCTableSourceSinkFactory without
> >> > going
> >> > > > thru service discovery because we are sure it's a jdbc table. I
> >> added
> >> > it
> >> > > to
> >> > > > the doc.
> >> > > >
> >> > > > For the different params besides schema, as we discussed offline,
> >> > > > unfortunately we can't do anything right now until Flink DDL/DML
> are
> >> > able
> >> > > > to distinguish 3 types of params - external data's metada,
> >> source/sink
> >> > > > runtime params, and Flink semantics params. The latter two can't
> be
> >> > > > provided by catalogs. The problem is actually general to all
> >> catalogs,
> >> > > not
> >> > > > just JDBCCatalog. I'm pushing for such an effort to solve it. At
> >> this
> >> > > > moment we can only use some default params for some cases, and the
> >> > other
> >> > > > cases cannot take advantage of the JDBC catalog and users still
> >> have to
> >> > > > write DDL manually.
> >> > > >
> >> > > > Thanks,
> >> > > > Bowen
> >> > > >
> >> > > > On Wed, Jan 8, 2020 at 7:46 PM Jingsong Li <
> jingsongl...@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Thanks Bowen for driving this,
> >> > > > >
> >> > > > > +1 for this, The DDL schema definition is a headache for users,
> >> and
> >> > > > catalog
> >> > > > > is a solution to this problem.
> >> > > > >
> >> > > > > I have some questions and suggestions:
> >> > > > >
> >> > > > > - We can provide a Builder for Catalog, In my opinion,
> >> > defaultDatabase,
> >> > > > > username, pwd can be included in JDBC DB url.
> >> > > > >
> >> > > > > - About timestamp and time, write down the specific Flink
> >> precision
> >> > of
> >> > > > > Postgres?
> >> > > > >
> >> > > > > - I think there is a part missing in your document, that is how
> to
> >> > use
> >> > > > this
> >> > > > > catalog. If you can write a complete example, I think it will be
> >> much
> >> > > > > clearer.
> >> > > > >
> >> > > > > - So a thing is what TableFactory will this catalog use? For
> >> example,
> >> > > > > JDBCTableSourceSinkFactory has different parameters for source
> or
> >> > sink?
> >> > > > How
> >> > > > > do you think about it?
> >> > > > >
> >> > > > > Best,
> >> > > > > Jingsong Lee
> >> > > > >
> >> > > > > On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <imj...@gmail.com>
> wrote:
> >> > > > >
> >> > > > > > Thanks Bowen for driving this.
> >> > > > > >
> >> > > > > > +1 to this feature.
> >> > > > > >
> >> > > > > > My concern is that why introducing a `PostgresJDBCCatalog`,
> not
> >> a
> >> > > > generic
> >> > > > > > `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
> >> > > > > > From my understanding, JDBC catalog is similar to JDBC
> >> source/sink.
> >> > > For
> >> > > > > > JDBC source/sink, we have a generic
> >> > > > > > implementation for JDBC and delegate operations to
> JDBCDialect.
> >> > > > Different
> >> > > > > > driver may have different implementation of
> >> > > > > > JDBCDialect, e.g `quoteIdentifier()`.
> >> > > > > >
> >> > > > > > For JDBC catalog, I guess maybe we can do it in the same way,
> >> i.e.
> >> > a
> >> > > > > > generic JDBCCatalog implementation and delegate
> >> > > > > > operations to JDBCDialect, and we will have `listDataBase()`,
> >> > > > > > `listTables()` interfaces in JDBCDialect. The benefit is that:
> >> > > > > > 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog also
> >> need
> >> > to
> >> > > > > quote
> >> > > > > > identifiers.
> >> > > > > > 1) we can easily to support a new database catalog (e.g.
> mysql)
> >> by
> >> > > > > > implementing new dialects (e.g. MySQLDialect).
> >> > > > > > 2) this can keep the same behavior as JDBC source/sink, i.e.
> >> > > > > > connector.type=jdbc, catalog.type=jdbc
> >> > > > > >
> >> > > > > > Best,
> >> > > > > > Jark
> >> > > > > >
> >> > > > > >
> >> > > > > > On Thu, 9 Jan 2020 at 08:33, Bowen Li <bowenl...@gmail.com>
> >> wrote:
> >> > > > > >
> >> > > > > > > Hi dev,
> >> > > > > > >
> >> > > > > > > I'd like to kick off a discussion on adding JDBC catalogs,
> >> > > > specifically
> >> > > > > > > Postgres catalog in Flink [1].
> >> > > > > > >
> >> > > > > > > Currently users have to manually create schemas in Flink
> >> > > source/sink
> >> > > > > > > mirroring tables in their relational databases in use cases
> >> like
> >> > > JDBC
> >> > > > > > > read/write and consuming CDC. Many users have complaint
> about
> >> the
> >> > > > > > > unnecessary, redundant, manual work. Any mismatch can lead
> to
> >> a
> >> > > > failing
> >> > > > > > > Flink job at runtime instead of compile time. All these have
> >> been
> >> > > > quite
> >> > > > > > > unpleasant, resulting in a broken user experience.
> >> > > > > > >
> >> > > > > > > We want to provide a JDBC catalog interface and a Postgres
> >> > > > > implementation
> >> > > > > > > for Flink as a start to connect to all kinds of relational
> >> > > databases,
> >> > > > > > > enabling Flink SQL to 1) retrieve table schema automatically
> >> > > without
> >> > > > > > > requiring user writes duped DDL 2) check at compile time for
> >> > schema
> >> > > > > > errors.
> >> > > > > > > It will greatly streamline user experiences when using Flink
> >> to
> >> > > deal
> >> > > > > with
> >> > > > > > > popular relational databases like Postgres, MySQL, MariaDB,
> >> AWS
> >> > > > Aurora,
> >> > > > > > > etc.
> >> > > > > > >
> >> > > > > > > Note that the problem and solution are actually very general
> >> to
> >> > > Flink
> >> > > > > > when
> >> > > > > > > connecting to all kinds of external systems. We just focus
> on
> >> > > solving
> >> > > > > > that
> >> > > > > > > for relational databases in this FLIP.
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > > Bowen
> >> > > > > > >
> >> > > > > > > [1]
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > Best, Jingsong Lee
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> >
> >> > --
> >> > Best, Jingsong Lee
> >> >
> >>
> >
>

Reply via email to