This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 9180ebe8b046eb584deff59bdec54e745d43a9e6 Author: Peeyush Gupta <peeyush.gu...@couchbase.com> AuthorDate: Thu Mar 13 21:56:38 2025 -0700 [ASTERIXDB-3578][EXT] Error with query on delta table with IN predicate - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-65533 Change-Id: I7177a10da9145a8de862991b3e57213138dae9b9 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19472 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Peeyush Gupta <peeyush.gu...@couchbase.com> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> --- .../deltalake-partitioned-file-read.02.query.sqlpp | 22 +++++++++ .../deltalake-partitioned-file-read.03.query.sqlpp | 22 +++++++++ .../deltalake-partitioned-file-read.04.query.sqlpp | 22 +++++++++ .../deltalake-partitioned-file-read.05.query.sqlpp | 24 ++++++++++ .../deltalake-partitioned-file-read.06.query.sqlpp | 24 ++++++++++ .../deltalake-partitioned-file-read.07.query.sqlpp | 24 ++++++++++ .../deltalake-partitioned-file-read.08.query.sqlpp | 25 ++++++++++ .../deltalake-partitioned-file-read.09.query.sqlpp | 24 ++++++++++ .../{read-data.2.adm => read-data.1.adm} | 0 .../read-data.2.adm | 5 +- .../{read-data.2.adm => read-data.3.adm} | 4 +- .../{read-data.2.adm => read-data.4.adm} | 6 ++- .../read-data.5.adm | 2 + .../read-data.6.adm | 13 ++++++ .../read-data.7.adm | 2 + .../read-data.8.adm | 6 +++ .../{read-data.2.adm => read-data.9.adm} | 0 .../reader/aws/delta/DeltaReaderFactory.java | 39 +++++++++------- .../asterix/external/util/ExternalDataUtils.java | 3 +- .../utils/filter/DeltaTableFilterBuilder.java | 54 ++++++++++++++++++---- 20 files changed, 287 insertions(+), 34 deletions(-) diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp new file mode 100644 index 0000000000..8e3b6b1cb0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds WHERE ds.date = "01-01-2025" AND ds.name IN ["Order 1", "Order 3", "Order 4"] order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp new file mode 100644 index 0000000000..71167a5424 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds WHERE ds.date = "01-01-2025" and ds.hour in [10, 16, 18] order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.04.query.sqlpp new file mode 100644 index 0000000000..3bddae5c31 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.04.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds WHERE ds.date = "01-01-2025" or ds.date="01-02-2025" order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.05.query.sqlpp new file mode 100644 index 0000000000..86206578e9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.05.query.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds + WHERE (ds.date > "01-02-2025" and ds.hour = 10) or (ds.date < "01-02-2025" and ds.hour = 15) + order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.06.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.06.query.sqlpp new file mode 100644 index 0000000000..7b88e1f20b --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.06.query.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + +SELECT ds.id as id1, ds2.id as id2 FROM DeltalakeDataset as ds, DeltalakeDataset as ds2 +WHERE ds.hour = ds2.hour and ds.date = "01-01-2025" and ds2.date is not null +order by ds.id, ds2.id; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.07.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.07.query.sqlpp new file mode 100644 index 0000000000..647878834f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.07.query.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds + WHERE (ds.date > "01-02-2025" and (ds.hour = 10 or ds.hour=16 or ds.hour=12) and ds.id>1) or (ds.date < "01-02-2025" and ds.hour = 15) + order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.08.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.08.query.sqlpp new file mode 100644 index 0000000000..eaf58cd3b5 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.08.query.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds + WHERE (ds.date >= "01-02-2025" and (ds.hour = 10 or ds.hour=16 or ds.hour=12) and ds.id>1) or + (ds.date < "01-02-2025" and ds.hour = 15) or ds.date =10 + order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.09.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.09.query.sqlpp new file mode 100644 index 0000000000..1a9dd7c40c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.09.query.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds + WHERE ds.date = 10 or ds.date = "01-01-2025" + order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.1.adm similarity index 100% copy from asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.1.adm diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm index 86d41fe4b0..cf5ecd490b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm @@ -1,5 +1,2 @@ { "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 } -{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 } -{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } -{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } -{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } \ No newline at end of file +{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.3.adm similarity index 59% copy from asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.3.adm index 86d41fe4b0..00ba142688 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.3.adm @@ -1,5 +1,3 @@ { "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 } { "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 } -{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } -{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } -{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } \ No newline at end of file +{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.4.adm similarity index 55% copy from asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.4.adm index 86d41fe4b0..b02a62d411 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.4.adm @@ -2,4 +2,8 @@ { "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 } { "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } { "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } -{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } \ No newline at end of file +{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } +{ "id": 5, "name": "Order 21", "date": "01-02-2025", "hour": 12 } +{ "id": 6, "name": "Order 22", "date": "01-02-2025", "hour": 12 } +{ "id": 7, "name": "Order 30", "date": "01-02-2025", "hour": 16 } +{ "id": 8, "name": "Order 31", "date": "01-02-2025", "hour": 16 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.5.adm new file mode 100644 index 0000000000..6fd91f179c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.5.adm @@ -0,0 +1,2 @@ +{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } +{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.6.adm new file mode 100644 index 0000000000..5bfd257349 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.6.adm @@ -0,0 +1,13 @@ +{ "id1": 0, "id2": 0 } +{ "id1": 0, "id2": 1 } +{ "id1": 0, "id2": 2 } +{ "id1": 1, "id2": 0 } +{ "id1": 1, "id2": 1 } +{ "id1": 1, "id2": 2 } +{ "id1": 2, "id2": 0 } +{ "id1": 2, "id2": 1 } +{ "id1": 2, "id2": 2 } +{ "id1": 3, "id2": 3 } +{ "id1": 3, "id2": 4 } +{ "id1": 4, "id2": 3 } +{ "id1": 4, "id2": 4 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.7.adm new file mode 100644 index 0000000000..6fd91f179c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.7.adm @@ -0,0 +1,2 @@ +{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } +{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.8.adm new file mode 100644 index 0000000000..e943ba718c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.8.adm @@ -0,0 +1,6 @@ +{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } +{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } +{ "id": 5, "name": "Order 21", "date": "01-02-2025", "hour": 12 } +{ "id": 6, "name": "Order 22", "date": "01-02-2025", "hour": 12 } +{ "id": 7, "name": "Order 30", "date": "01-02-2025", "hour": 16 } +{ "id": 8, "name": "Order 31", "date": "01-02-2025", "hour": 16 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.9.adm similarity index 100% copy from asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.9.adm diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java index 3c998a5ae9..4e902b9943 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java @@ -62,6 +62,7 @@ import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelEngineException; import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.expressions.Expression; import io.delta.kernel.expressions.Predicate; @@ -107,7 +108,7 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> Snapshot snapshot; try { snapshot = table.getLatestSnapshot(engine); - } catch (KernelException e) { + } catch (KernelException | KernelEngineException e) { LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e); throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); } @@ -136,30 +137,36 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); } scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); - CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine); + List<Row> scanFiles; + try { + scanFiles = getScanFiles(scan, engine); + } catch (UnsupportedOperationException | IllegalStateException e) { + // Delta kernel API failed to apply expression due to type mismatch. + // We need to fall back to skip applying the filter and return all files. + LOGGER.info("Exception encountered while getting delta table files to scan {}", e.getMessage()); + scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); + scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); + scanFiles = getScanFiles(scan, engine); + } + LOGGER.info("Number of delta table parquet data files to scan: {}", scanFiles.size()); + locationConstraints = getPartitions(appCtx); + configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); + distributeFiles(scanFiles, getPartitionConstraint().getLocations().length); + issueWarnings(warnings, warningCollector); + } + private List<Row> getScanFiles(Scan scan, Engine engine) { List<Row> scanFiles = new ArrayList<>(); + CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine); while (iter.hasNext()) { - FilteredColumnarBatch batch = null; - try { - batch = iter.next(); - } catch (UnsupportedOperationException e) { - // Failed to apply expression due to type mismatch. We can skip the files where partitioned column - // type is different from the type of value provided in the predicate - LOGGER.info("Unsupported operation {}", e.getMessage()); - continue; - } + FilteredColumnarBatch batch = iter.next(); CloseableIterator<Row> rowIter = batch.getRows(); while (rowIter.hasNext()) { Row row = rowIter.next(); scanFiles.add(row); } } - LOGGER.info("Number of files to scan: {}", scanFiles.size()); - locationConstraints = getPartitions(appCtx); - configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); - distributeFiles(scanFiles, getPartitionConstraint().getLocations().length); - issueWarnings(warnings, warningCollector); + return scanFiles; } private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 82b5dad645..fac11990da 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -120,6 +120,7 @@ import org.apache.logging.log4j.Logger; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelEngineException; import io.delta.kernel.exceptions.KernelException; public class ExternalDataUtils { @@ -540,7 +541,7 @@ public class ExternalDataUtils { io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); try { table.getLatestSnapshot(engine); - } catch (KernelException e) { + } catch (KernelException | KernelEngineException e) { LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e); throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java index 73ed81e0c7..112dba224b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java @@ -35,6 +35,7 @@ import org.apache.asterix.om.base.AString; import org.apache.asterix.om.constants.AsterixConstantValue; import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -48,8 +49,7 @@ import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.logging.log4j.LogManager; - -import com.microsoft.azure.storage.core.Logger; +import org.apache.logging.log4j.Logger; import io.delta.kernel.expressions.Column; import io.delta.kernel.expressions.Expression; @@ -58,7 +58,7 @@ import io.delta.kernel.expressions.Predicate; public class DeltaTableFilterBuilder extends AbstractFilterBuilder { - private static final org.apache.logging.log4j.Logger LOGGER = LogManager.getLogger(); + private static final Logger LOGGER = LogManager.getLogger(); public DeltaTableFilterBuilder(ExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo, JobGenContext context, IVariableTypeEnvironment typeEnv) { @@ -72,7 +72,7 @@ public class DeltaTableFilterBuilder extends AbstractFilterBuilder { try { deltaTablePredicate = createExpression(filterExpression); } catch (Exception e) { - LOGGER.error("Error creating DeltaTable filter expression ", e); + LOGGER.error("Error creating DeltaTable filter expression, skipping filter pushdown", e); } } if (deltaTablePredicate != null && !(deltaTablePredicate instanceof Predicate)) { @@ -138,12 +138,16 @@ public class DeltaTableFilterBuilder extends AbstractFilterBuilder { private Expression handleFunction(ILogicalExpression expr) throws AlgebricksException { AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr; IFunctionDescriptor fd = resolveFunction(funcExpr); - List<Expression> args = handleArgs(funcExpr); FunctionIdentifier fid = fd.getIdentifier(); + if (funcExpr.getArguments().size() != 2 + && !(fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR))) { + throw new RuntimeException("Predicate should only have 2 arguments: " + funcExpr); + } + List<Expression> args = handleArgs(funcExpr); if (fid.equals(AlgebricksBuiltinFunctions.AND)) { - return new Predicate("AND", args); + return createAndOrPredicate("AND", args, 0); } else if (fid.equals(AlgebricksBuiltinFunctions.OR)) { - return new Predicate("OR", args); + return createAndOrPredicate("OR", args, 0); } else if (fid.equals(AlgebricksBuiltinFunctions.EQ)) { return new Predicate("=", args); } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) { @@ -173,8 +177,40 @@ public class DeltaTableFilterBuilder extends AbstractFilterBuilder { protected Column createColumnExpression(ILogicalExpression expression) { ARecordType path = filterPaths.get(expression); if (path.getFieldNames().length != 1) { - throw new RuntimeException("Unsupported expression: " + expression); + throw new RuntimeException("Unsupported column expression: " + expression); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) { + // The field could be a nested field + List<String> fieldList = new ArrayList<>(); + fieldList = createPathExpression(path, fieldList); + return new Column(fieldList.toArray(new String[0])); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) { + return new Column(path.getFieldNames()[0]); + } else { + throw new RuntimeException("Unsupported column expression: " + expression); + } + } + + private List<String> createPathExpression(ARecordType path, List<String> fieldList) { + if (path.getFieldNames().length != 1) { + throw new RuntimeException("Error creating column expression"); + } else { + fieldList.add(path.getFieldNames()[0]); + } + if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) { + return createPathExpression((ARecordType) path.getFieldTypes()[0], fieldList); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) { + return fieldList; + } else { + throw new RuntimeException("Error creating column expression"); + } + } + + // Converts or(pred1, pred2, pred3) to or(pred1, or(pred2, pred3)) + private Predicate createAndOrPredicate(String function, List<Expression> args, int index) { + if (index == args.size() - 2) { + return new Predicate(function, args.get(index), args.get(index + 1)); + } else { + return new Predicate(function, args.get(index), createAndOrPredicate(function, args, index + 1)); } - return new Column(path.getFieldNames()[0]); } }