[
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-35560:
-----------------------------------
Labels: pull-request-available (was: )
> Add query validator support to flink sql gateway via spi pattern
> ----------------------------------------------------------------
>
> Key: FLINK-35560
> URL: https://issues.apache.org/jira/browse/FLINK-35560
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Gateway
> Reporter: dongwoo.kim
> Priority: Major
> Labels: pull-request-available
>
> h3. Summary
> Hello I'd like to suggest query validator support in flink sql gateway via
> spi pattern.
> As an sql gateway operator, there is need for query validation to only
> execute safe queries and drop unsafe queries.
> To address this need, I propose adding a {{QueryValidator}} interface in
> flink sql gateway api package.
> This interface will allow users to implement their own query validation
> logic, providing benefits to flink sql gateway operators.
> h3. Interface
> Below is a draft for the interface.
> It takes Operation and check whether the query is valid or not.
> {code:java}
> package org.apache.flink.table.gateway.api.validator;
> import org.apache.flink.annotation.Public;
> import org.apache.flink.table.operations.Operation;
> /**
> * Interface for implementing a validator that checks the safety of executing
> queries.
> */
> @Public
> public interface QueryValidator {
> boolean validateQuery(Operation op);
> }
> {code}
> h3. Example implementation
> Below is an example implementation that inspects Kafka table options,
> specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value
> is too small, which can cause high disk I/O load.
> {code:java}
> package org.apache.flink.table.gateway.api.validator;
> import org.apache.flink.table.gateway.api.validator.QueryValidator;
> import org.apache.flink.table.operations.Operation;
> import org.apache.flink.table.operations.ddl.CreateTableOperation;
> public class KafkaTimestampValidator implements QueryValidator {
> private static final long ONE_DAY = 24 * 60 * 60 * 1000L;
> @Override
> public boolean validateQuery(Operation op) {
> if (op instanceof CreateTableOperation) {
> CreateTableOperation createTableOp = (CreateTableOperation) op;
> String connector =
> createTableOp.getCatalogTable().getOptions().get("connector");
> if ("kafka".equals(connector)) {
> String startupTimestamp =
> createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
> if (startupTimestamp != null &&
> Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
> return false;
> }
> }
> }
> return true;
> }
> }{code}
> I'd be happy to implement this feature, if we can reach on agreement.
> Thanks.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)