[
https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rong Rong updated FLINK-12399:
------------------------------
Fix Version/s: 1.9.2
> FilterableTableSource does not use filters on job run
> -----------------------------------------------------
>
> Key: FLINK-12399
> URL: https://issues.apache.org/jira/browse/FLINK-12399
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.8.0
> Reporter: Josh Bradt
> Assignee: Rong Rong
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.10.0, 1.9.2
>
> Attachments: flink-filter-bug.tar.gz
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> As discussed [on the mailing
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html],
> there appears to be a bug where a job that uses a custom
> FilterableTableSource does not keep the filters that were pushed down into
> the table source. More specifically, the table source does receive filters
> via applyPredicates, and a new table source with those filters is returned,
> but the final job graph appears to use the original table source, which does
> not contain any filters.
> I attached a minimal example program to this ticket. The custom table source
> is as follows:
> {code:java}
> public class CustomTableSource implements BatchTableSource<Model>,
> FilterableTableSource<Model> {
> private static final Logger LOG =
> LoggerFactory.getLogger(CustomTableSource.class);
> private final Filter[] filters;
> private final FilterConverter converter = new FilterConverter();
> public CustomTableSource() {
> this(null);
> }
> private CustomTableSource(Filter[] filters) {
> this.filters = filters;
> }
> @Override
> public DataSet<Model> getDataSet(ExecutionEnvironment execEnv) {
> if (filters == null) {
> LOG.info("==== No filters defined ====");
> } else {
> LOG.info("==== Found filters ====");
> for (Filter filter : filters) {
> LOG.info("FILTER: {}", filter);
> }
> }
> return execEnv.fromCollection(allModels());
> }
> @Override
> public TableSource<Model> applyPredicate(List<Expression> predicates) {
> LOG.info("Applying predicates");
> List<Filter> acceptedFilters = new ArrayList<>();
> for (final Expression predicate : predicates) {
> converter.convert(predicate).ifPresent(acceptedFilters::add);
> }
> return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
> }
> @Override
> public boolean isFilterPushedDown() {
> return filters != null;
> }
> @Override
> public TypeInformation<Model> getReturnType() {
> return TypeInformation.of(Model.class);
> }
> @Override
> public TableSchema getTableSchema() {
> return TableSchema.fromTypeInfo(getReturnType());
> }
> private List<Model> allModels() {
> List<Model> models = new ArrayList<>();
> models.add(new Model(1, 2, 3, 4));
> models.add(new Model(10, 11, 12, 13));
> models.add(new Model(20, 21, 22, 23));
> return models;
> }
> }
> {code}
>
> When run, it logs
> {noformat}
> 15:24:54,888 INFO com.klaviyo.filterbug.CustomTableSource
> - Applying predicates
> 15:24:54,901 INFO com.klaviyo.filterbug.CustomTableSource
> - Applying predicates
> 15:24:54,910 INFO com.klaviyo.filterbug.CustomTableSource
> - Applying predicates
> 15:24:54,977 INFO com.klaviyo.filterbug.CustomTableSource
> - ==== No filters defined ===={noformat}
> which appears to indicate that although filters are getting pushed down, the
> final job does not use them.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)