Josh Bradt created FLINK-12399:
----------------------------------
Summary: FilterableTableSource does not use filters on job run
Key: FLINK-12399
URL: https://issues.apache.org/jira/browse/FLINK-12399
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.8.0
Reporter: Josh Bradt
Attachments: flink-filter-bug.tar.gz
As discussed [on the mailing
list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html],
there appears to be a bug where a job that uses a custom FilterableTableSource
does not keep the filters that were pushed down into the table source. More
specifically, the table source does receive filters via applyPredicates, and a
new table source with those filters is returned, but the final job graph
appears to use the original table source, which does not contain any filters.
I attached a minimal example program to this ticket. The custom table source is
as follows:
{code:java}
public class CustomTableSource implements BatchTableSource<Model>,
FilterableTableSource<Model> {
private static final Logger LOG =
LoggerFactory.getLogger(CustomTableSource.class);
private final Filter[] filters;
private final FilterConverter converter = new FilterConverter();
public CustomTableSource() {
this(null);
}
private CustomTableSource(Filter[] filters) {
this.filters = filters;
}
@Override
public DataSet<Model> getDataSet(ExecutionEnvironment execEnv) {
if (filters == null) {
LOG.info("==== No filters defined ====");
} else {
LOG.info("==== Found filters ====");
for (Filter filter : filters) {
LOG.info("FILTER: {}", filter);
}
}
return execEnv.fromCollection(allModels());
}
@Override
public TableSource<Model> applyPredicate(List<Expression> predicates) {
LOG.info("Applying predicates");
List<Filter> acceptedFilters = new ArrayList<>();
for (final Expression predicate : predicates) {
converter.convert(predicate).ifPresent(acceptedFilters::add);
}
return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
}
@Override
public boolean isFilterPushedDown() {
return filters != null;
}
@Override
public TypeInformation<Model> getReturnType() {
return TypeInformation.of(Model.class);
}
@Override
public TableSchema getTableSchema() {
return TableSchema.fromTypeInfo(getReturnType());
}
private List<Model> allModels() {
List<Model> models = new ArrayList<>();
models.add(new Model(1, 2, 3, 4));
models.add(new Model(10, 11, 12, 13));
models.add(new Model(20, 21, 22, 23));
return models;
}
}
{code}
When run, it logs
{noformat}
15:24:54,888 INFO com.klaviyo.filterbug.CustomTableSource
- Applying predicates
15:24:54,901 INFO com.klaviyo.filterbug.CustomTableSource
- Applying predicates
15:24:54,910 INFO com.klaviyo.filterbug.CustomTableSource
- Applying predicates
15:24:54,977 INFO com.klaviyo.filterbug.CustomTableSource
- ==== No filters defined ===={noformat}
which appears to indicate that although filters are getting pushed down, the
final job does not use them.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)