Thanks for updating the FLIP and starting a VOTE thread. I have two last minute questions before I cast my vote:

It seems the definition of TEMPORARY SYSTEM CONNECTION is not precise enough. In my point of view, system connections are single part identifiers (similar to functions). And catalog connections are 3-part identifiers.

So will we support the following DDL:

CREATE TEMPORARY SYSTEM CONNECTION global_connection WITH (...);

CREATE TABLE t (...) USING CONNECTION global_connection

I think we should. But then we need to update the `CatalogTable.getConnection(): Optional<ObjectIdentifier>` interface, because ObjectIdentifier currently only supports 3 part identifiers.

I suggest we modify ObjectIdentifier and let it have either 3 or 1 part. Then we can also remove the need for FunctionIdentifier which exists for exactly this purpose.

What do you think?

Cheers,
Timo


On 30.07.25 05:36, Leonard Xu wrote:
Thanks Hao for the effort to push the FLIP forward, I believe current design 
would satisfy most user cases.

+1 to start a vote.

Best,
Leonard

2025 7月 30 02:27,Hao Li <h...@confluent.io.INVALID> 写道:

Sorry, I forgot to mention I also updated the FLIP to include table apis
for connection. It was originally in examples but not in the public api
section.

On Tue, Jul 29, 2025 at 10:12 AM Hao Li <h...@confluent.io> wrote:

Hi Leonard,

Thanks for the feedback and offline sync yesterday.

I think connection is a very common need for most catalogs like
MySQLCatalog、KafkaCatalog、HiveCatalog and so on, all of these catalogs need
a connection.
I added `TEMPORARY SYSTEM` connection so it's a global level connection
which can be used for Catalog creation. After syncing with Timo, we propose
to store it first in memory like `TEMPORARY SYSTEM FUNCTION` since this
FLIP is already introducing lots of concepts and interfaces. We can provide
`SYSTEM` connection and related interfaces to persist it in following up
FLIPs.

In this case, I think reducing connector development cost is more
important than making is explicit, the connector knows which options is
sensitive or not.
Sure. I updated the FLIP to merge connection options with table options so
it's easier for current connectors.

I hope the BasicConnectionFactory can be a common one that can feed most
common users case, otherwise encrypt all options is a good idea.
I updated to `DefaultConnectionFactory` which handles most of the secret
keys.

Thanks,
Hao

On Mon, Jul 28, 2025 at 6:13 AM Leonard Xu <xbjt...@gmail.com> wrote:

Hey, Hao

Please see my comments as follows:

2. I think we can also modify the create catalog ddl syntax.

```
CREATE CATALOG cat USING CONNECTION mycat.mydb.mysql_prod
WITH (
   'type' = 'jdbc'
);
```

Does `mycat.mydb.mysql_prod` exist in another catalog? This feels like a
chicken-egg problem. I think for connections to be used for catalog
creation, it needs to be system connection similar to system function
which
exist outside of CatalogManager. Maybe we can defer this to later
functionality?

I think connection is a very common need for most catalogs like
MySQLCatalog、KafkaCatalog、HiveCatalog and so on, all of these catalogs need
a connection.


3. It seems the connector factory should merge the with options and
connection options together and then create the source/sink. It's
better that framework can merge all these options and connectors don't
need
any codes.

I think it's better to separate connections with table options and make
it
explicit. Reasons is: the connection here should be a decrypted one.
It's
sensitive and should be handled carefully regarding logging, usage etc.
Mixing with original table options makes it hard to do. But the Flip
used
`CatalogConnection` which is an encrypted one. I'll update it to
`SensitveConnection`.
(4)Framework-level Resolution : +1 to Shengkai's point about having the
framework (DynamicTableFactory) return complete options to reduce
connector
adaptation cost.

Please see my explanation for Shengkai's similar question.

In this case, I think reducing connector development cost is more
important than making is explicit, the connector knows which options is
sensitive or not.


4. Why we need to introduce ConnectionFactory? I think connection is
like
CatalogTable. It should hold the basic information and all information
in
the connection should be stored into secret store.

The main reason is to enable user defined connection handling for
different
types. For example, if connection type is `basic`, the corresponding
factory can handle basic type secrets (e.g. extract username/password
from
connection options and do encryption).

(2) Configurability of SECRET_FIELDS : Could the hardcoded
SECRET_FIELDS
in BasicConnectionFactory be made configurable (e.g., 'token' vs
'accessKey') for better connector compatibility?

This depends on `ConnectionFactory` implementation and can be self
defined
by user.

I hope the BasicConnectionFactory can be a common one that can feed most
common users case, otherwise encrypt all options is a good idea.

Btw, I also want to push the FLIP forward and start a vote ASAP, thus a
meeting is welcome if you think it can help finalizing the discussion
thread.


Best,
Leonard




Hi friends

I like the updated FLIP goals, that’s what I want. I’ve some feedback:

(1) Minor: Interface Hierarchy : Why doesn't WritableSecretStore extend
SecretStore?
(2) Configurability of SECRET_FIELDS : Could the hardcoded
SECRET_FIELDS
in BasicConnectionFactory be made configurable (e.g., 'token' vs
'accessKey') for better connector compatibility?
(3)Inconsistent Return Types : ConnectionFactory#resolveConnection
returns
SensitiveConnection, while BasicConnectionFactory#resolveConnection
returns
Map<String, String>. Should these be aligned?
(4)Framework-level Resolution : +1 to Shengkai's point about having the
framework (DynamicTableFactory) return complete options to reduce
connector
adaptation cost.
(5)Secret ID Handling : When no encryption is needed, secretId is null
(from secrets.isEmpty() ? null : secretStore.storeSecret(secrets)).
This
behavior should be explicitly documented in the interfaces.

Best,
Leonard

2025 7月 24 11:44,Shengkai Fang <fskm...@gmail.com> 写道:

hi.

Sorry for the late reply. I just have some questions:

1. Why SecretStoreFactory#open throws a CatalogException? I think the
exteranl system can not handle this exception.

2. I think we can also modify the create catalog ddl syntax.

```
CREATE CATALOG cat USING CONNECTION mycat.mydb.mysql_prod
WITH (
  'type' = 'jdbc'
);
```

3. It seems the connector factory should merge the with options and
connection options together and then create the source/sink. It's
better that framework can merge all these options and connectors don't
need
any codes.

4. Why we need to introduce ConnectionFactory? I think connection is
like
CatalogTable. It should hold the basic information and all
information in
the connection should be stored into secret store.


Best,
Shengkai


Timo Walther <twal...@apache.org> 于2025年7月22日周二 22:04写道:

Hi Mayank,

Thanks for updating the FLIP and clearly documenting our discussion.

+1 for moving forward with the vote, unless there are objections from
others.

Cheers,
Timo

On 22.07.25 02:14, Mayank Juneja wrote:
Hi Ryan and Austin,

Thanks for your suggestions. I've updated the FLIP with the
following
additional info -

1. *table.secret-store.kind* key to register the SecretStore in a
yaml
file
2. *updateSecret* method in WritableSecretStore interface

Thanks,
Mayank

On Thu, Jul 17, 2025 at 5:42 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

Hey all,

Thanks for the nice flip all! I’m just reading through – had one
question
on the ALTER CONNECTION implementation flow. Would it make sense
for
the
WritableSecretStore to expose a method for updating a secret by
ID, so
it
can be done atomically? Else, would we need to call delete and
create
again, potentially introducing concurrent resolution errors?

Best,
Austin

On Thu, Jul 17, 2025 at 13:07 Ryan van Huuksloot
<ryan.vanhuuksl...@shopify.com.invalid> wrote:

Hi Mayank,

Thanks for updating the FLIP. Overall it looks good to me.

One question I had related to how someone could choose the
SecretStore
they
want to use if they use something like the SQL Gateway as the
entrypoint
on
top of a remote Session cluster. I don't see an explicit way to
set
the
SecretStore in the FLIP.
I assume we'll do it similar to the CatalogStore but I wanted to
call
this
out.

table.catalog-store.kind: filetable.catalog-store.file.path:
file:///path/to/catalog/store/

Ryan van Huuksloot
Staff Engineer, Infrastructure | Streaming Platform
[image: Shopify]
<

https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email



On Wed, Jul 16, 2025 at 2:22 PM Mayank Juneja <
mayankjunej...@gmail.com>
wrote:

Hi everyone,

Thanks for your valuable inputs. I have updated the FLIP with the
ideas
proposed earlier in the thread. Looking forward to your feedback.
https://cwiki.apache.org/confluence/x/cYroF

Best,
Mayank

On Fri, Jun 27, 2025 at 2:59 AM Leonard Xu <xbjt...@gmail.com>
wrote:

Quick response, thanks Mayank, Hao and Timo for the effort.  The
new
proposal looks well, +1 from my side.

Could you draft(update) current FLIP docs thus we can have some
specific
discussions later?


Best,
Leonard


2025 6月 26 15:06,Timo Walther <twal...@apache.org> 写道:

Hi everyone,

sorry for the late reply, feature freeze kept me busy. Mayank,
Hao
and
I
synced offline and came up we an improved proposal. Before we
update
the
FLIP let me summarize the most important key facts that
hopefully
address
most concerns:

1) SecretStore
- Similar to CatalogStore, we introduce a SecretStore as the
highest
level in TableEnvironment.
- SecretStore is initialized with options and potentially
environment
variables. Including
EnvironmentSettings.withSecretStore(SecretStore).
- The SecretStore is pluggable and discovered using the regular
factory-approach.
- For example, it could implement Azure Key Vault or other
cloud
provider secrets stores.
- Goal: Flink and Flink catalogs do not have to deal with
sensitive
data.

2) Connections
- Connections are catalog objects identified with 3-part
identifiers.
3-part identifiers are crucial for managability of larger
projects
and
align with existing catalog objects.
- They contain connection details, e.g. URL, query parameters,
and
other
configuration.
- They do not contain secrets, but only pointers to secrets in
the
SecretStore.

3) Connection DDL

CREATE [TEMPORARY] CONNECTION mycat.mydb.OpenAPI WITH (
'type' = 'basic' | 'bearer' | 'jwt' | 'oauth' | ...,
...
)

- Connection type is pluggable and discovered using the regular
factory-approach.
- The factory extracts secrets and puts them into SecretStore.
- The factory only leaves non-confidential options left that
can
be
stored in a catalog.

When executing:
CREATE [TEMPORARY] CONNECTION mycat.mydb.OpenAPI WITH (
'type' = 'basic',
'url' = 'api.example.com',
'username' = 'bob',
'password' = 'xyz'
)

The catalog will receive something similar to:
CREATE [TEMPORARY] CONNECTION mycat.mydb.OpenAPI WITH (
'type' = 'basic',
'url' = 'api.example.com',
'secret.store' = 'azure-key-vault'
'secret.id' = 'secretId'
)

- However, the exact property design is up to the connection
factory.

4) Connection Usage

CREATE TABLE t (...) USING CONNECTION mycat.mydb.OpenAPI;

- MODEL, FUNCTION, TABLE DDL will support USING CONNECTION
keyword
similar to BigQuery.
- The connection will be provided in a table/model
provider/function
definition factory.

5) CatalogStore / Catalog Initialization

Catalog store or catalog can make use of SecretStore to
retrieve
initial
credentials for bootstrapping. All objects lower then catalog
store/catalog
can then use connections. If you think we still need system
level
connections, we can support CREATE SYSTEM CONNECTION GlobalName
WITH
(..)
similar to SYSTEM functions directly store in a
ConnectioManager in
TableEnvironment. But for now I would suggest to start simple
with
per-catalog connections and later evolve the design.

Dealing with secrets is a very sensitive topic and I'm clearly
not
an
expert on it. This is why we should try to push the problem to
existing
solutions and don't start storing secrets in Flink in any way.
Thus,
the
interfaces will be defined very generic.

Looking forward to your feedback.

Cheers,
Timo





On 09.06.25 04:01, Leonard Xu wrote:
Thanks  Timo for joining this thread.
I agree that this feature is needed by the community; the
current
disagreement is only about the implementation method or
solution.
Your thoughts looks generally good to me, looking forward to
your
proposal.
Best,
Leonard
2025 6月 6 22:46,Timo Walther <twal...@apache.org> 写道:

Hi everyone,

thanks for this healthy discussion. Looking at high number of
participants, it looks like we definitely want this feature. We
just
need
to figure out the "how".

This reminds me very much of the discussion we had for CREATE
FUNCTION. There, we discussed whether functions should be named
globally
or
catalog-specific. In the end, we decided for both `CREATE SYSTEM
FUNCTION`
and `CREATE FUNCTION`, satisfying both the data platform team
of an
organization (which might provide system functions) and
individual
data
teams or use cases (scoped by catalog/database).

Looking at other modern vendors like Snowflake there is
SECRET
(scoped
to schema) [1] and API INTEGRATION [2] (scoped to account). So
also
other
vendors offer global and per-team / per-use case connections
details.

In general, I think fitting connections into the existing
concepts
for
catalog objects (with three-part identifier) makes managing them
easier.
But I also see the need for global defaults.

Btw keep in mind that a catalog implementation should only
store
metadata. Similar how a CatalogTable doesn't store the actual
data, a
CatalogConnection should not store the credentials. It should
only
offer
a
factory that allows for storing and retrieving them. In real
world
scenarios a factory is most likely backed by a product like
Azure
Key
Vault.

So code-wise having a ConnectionManager that behaves similar
to
FunctionManager sounds reasonable.

+1 for having special syntax instead of using properties.
This
allows
to access connections in tables, models, functions. And
catalogs,
if
we
agree to have global ones as well.

What do you think?

Let me spend some more thoughts on this and come back with a
concrete
proposal by early next week.

Cheers,
Timo

[1]
https://docs.snowflake.com/en/sql-reference/sql/create-secret
[2]


https://docs.snowflake.com/en/sql-reference/sql/create-api-integration

On 04.06.25 10:47, Leonard Xu wrote:
Hey,Mayank
Please see my feedback as following:
1. One of the motivations of this FLIP is to improve
security.
However, the current design stores all connection information in
the
catalog,
and each Flink SQL job reads from the catalog during
compilation.
The
connection information is passed between SQL Gateway and the
catalog in plaintext, which actually introduces new security
risks.
2. The name "Connection" should be changed to something like
ConnectionSpec to clearly indicate that it is a object
containing
only
static
properties without a lifecycle. Putting aside the naming
issue,
I
think the current model and hierarchy design is somewhat
strange.
Storing
various kinds of connections (e.g., Kafka, MySQL) in the
same
Catalog
with hierarchical identifiers like
catalog-name.db-name.connection-name
raises the following questions:
(1) What is the purpose of this hierarchical structure of
Connection
object ?
(2) If we can use a Connection to create a MySQL table, why
can't
we
use a Connection to create a MySQL Catalog?
3. Regarding the connector usage examples given in this
FLIP:
```sql
1  -- Example 2: Using connection for jdbc tables
2  CREATE OR REPLACE CONNECTION mysql_customer_db
3  WITH (
4    'type' = 'jdbc',
5    'jdbc.url' = 'jdbc:mysql://
customer-db.example.com:3306/customerdb',
6    'jdbc.connection.ssl.enabled' = 'true'
7  );
8
9  CREATE TABLE customers (
10   customer_id INT,
11   PRIMARY KEY (customer_id) NOT ENFORCED
12 ) WITH (
13   'connector' = 'jdbc',
14   'jdbc.connection' = 'mysql_customer_db',
15   'jdbc.connection.ssl.enabled' = 'true',
16   'jdbc.connection.max-retry-timeout' = '60s',
17   'jdbc.table-name' = 'customers',
18   'jdbc.lookup.cache' = 'PARTIAL'
19 );
```
I see three issues from SQL semantics and Connector
compatibility
perspectives:
(1) Look at line 14: `mysql_customer_db` is an object
identifier
of
a
CONNECTION defined in SQL. However, this identifier is
referenced
   via a string value inside the table’s WITH clause, which
feel
hack for me.
(2) Look at lines 14–16: the use of the specific prefix
`jdbc.connection` will confuse users because `connection.xx`
maybe
already
used as
a prefix for existing configuration items.
(3) Look at lines 14–18: Why do all existing configuration
options
need to be prefixed with `jdbc`, even they’re not related to
Connection
properties?
This completely changes user habits — is it backward
compatible?
In my opinion, Connection should be a model independent of
both
Catalog and Table, and can be referenced by all
catalog/table/udf/model
object.
It should be managed by a Component such as a
ConnectionManager
to
enable reuse. For security purposes, authentication mechanisms
could
be supported within the ConnectionManager.
Best,
Leonard
2025 6月 4 02:04,Martijn Visser <martijnvis...@apache.org>
写道:

Hi all,

First of all, I think having a Connection resource is
something
that
will
be beneficial for Apache Flink. I could see that being
extended
in
the
future to allow for easier secret handling [1].
In my mental mind, I'm comparing this proposal against
SQL/MED
from
the ISO
standard [2]. I do think that SQL/MED isn't a very user
friendly
syntax
though, looking at Postgres for example [3].

I think it's a valid question if Connection should be
considered
with a
catalog or database-level scope. @Ryan can you share
something
more,
since
you've mentioned "Note: I much prefer catalogs for this
case.
Which
is what
we use internally to manage connection properties". It
looks
like
there
isn't a strong favourable approach looking at other vendors
(like,
Databricks does scopes it on a Unity catalog, Snowflake on
a
database
level).

Also looking forward to Leonard's input.

Best regards,

Martijn

[1] https://issues.apache.org/jira/browse/FLINK-36818
[2] https://www.iso.org/standard/84804.html
[3]
https://www.postgresql.org/docs/current/sql-createserver.html

On Fri, May 30, 2025 at 5:07 AM Leonard Xu <
xbjt...@gmail.com

wrote:

Hey Mayank.

Thanks for the FLIP, I went through this FLIP quickly and
found
some
issues which I think we
need to deep discuss later. As we’re on a short Dragon
boat
Festival,
could you kindly hold
on this thread? and we will back to continue the FLIP
discuss.

Best,
Leonard


2025 4月 29 23:07,Mayank Juneja <mayankjunej...@gmail.com

写道:

Hi all,

I would like to open up for discussion a new FLIP-529
[1].

Motivation:
Currently, Flink SQL handles external connectivity by
defining
endpoints
and credentials in table configuration. This approach
prevents
reusability
of these connections and makes table definition less
secure
by
exposing
sensitive information.
We propose the introduction of a new "connection"
resource
in
Flink. This
will be a pluggable resource configured with a remote
endpoint
and
associated access key. Once defined, connections can be
reused
across
table
definitions, and eventually for model definition (as
discussed
in
FLIP-437)
for inference, enabling seamless and secure integration
with
external
systems.
The connection resource will provide a new, optional way
to
manage
external
connectivity in Flink. Existing methods for table
definitions
will
remain
unchanged.

[1] https://cwiki.apache.org/confluence/x/cYroF

Best Regards,
Mayank Juneja







--
*Mayank Juneja*
Product Manager | Data Streaming and AI













Reply via email to