Repository: asterixdb Updated Branches: refs/heads/master 8cbb05cec -> de0ece7f1
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.5.server.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.5.server.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.5.server.sqlpp new file mode 100644 index 0000000..aacaeaf --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.5.server.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + stop 10001 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.6.update.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.6.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.6.update.sqlpp new file mode 100644 index 0000000..cc12612 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.6.update.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; +DELETE FROM Tweet t WHERE t.id = 668945640186101761; + http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.7.server.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.7.server.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.7.server.sqlpp new file mode 100644 index 0000000..9405846 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.7.server.sqlpp @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Continue ingest 10,000 records + */ + +start client 10001 file-client 127.0.0.1 ../asterix-app/data/twitter/real.2.adm 500 50 1000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.8.sleep.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.8.sleep.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.8.sleep.sqlpp new file mode 100644 index 0000000..c0e90c8 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.8.sleep.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +5000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.9.server.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.9.server.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.9.server.sqlpp new file mode 100644 index 0000000..aacaeaf --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.9.server.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + stop 10001 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.1.ddl.sqlpp new file mode 100644 index 0000000..417f841 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.1.ddl.sqlpp @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * This test case verify if the filter optimization rule is still correct when there + * is one upsert on some of the component. The new value should be returned. + * + * 1. create the dataset Tweet that ingested by a feed. + * 2. update one tweet + * 3. start the feed again to make the previous component flush to disk + * 4. send the query by the old value to see if any record returns + */ + + +drop dataverse test if exists; +create dataverse test if not exists; +use test; + +create type typeUser if not exists as open { + id: int64, + name: string, + screen_name : string, + lang : string, + location: string, + create_at: date, + description: string, + followers_count: int32, + friends_count: int32, + statues_count: int64 +} + +create type typePlace if not exists as open{ + country : string, + country_code : string, + full_name : string, + id : string, + name : string, + place_type : string, + bounding_box : rectangle +} + +create type typeGeoTag if not exists as open { + stateID: int32, + stateName: string, + countyID: int32, + countyName: string, + cityID: int32?, + cityName: string? +} + +create type typeTweet if not exists as open{ + create_at : datetime, + id: int64, + `text`: string, + in_reply_to_status : int64, + in_reply_to_user : int64, + favorite_count : int64, + coordinate: point?, + retweet_count : int64, + lang : string, + is_retweet: boolean, + hashtags : {{ string }} ?, + user_mentions : {{ int64 }} ? , + user : typeUser, + place : typePlace?, + geo_tag: typeGeoTag +} + +create dataset Tweet(typeTweet) primary key id +using compaction policy prefix (("max-mergable-component-size"="32768"),("max-tolerance-component-count"="32")) +with filter on create_at; + +create index text_idx if not exists on Tweet(`text`) type btree; +create index state_idx if not exists on Tweet(geo_tag.stateID) type btree; + +create feed TweetFeed using socket_adapter +( + ("sockets"="127.0.0.1:10001"), + ("address-type"="IP"), + ("type-name"="typeTweet"), + ("format"="adm") +); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.10.query.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.10.query.sqlpp new file mode 100644 index 0000000..4846e8c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.10.query.sqlpp @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use test; + +SELECT value m.geo_tag.stateID from Tweet m +WHERE m.`text` = "Just posted a photo @ Campus Martius Park https://t.co/5Ax4E2CdWZ" +; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.11.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.11.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.11.ddl.sqlpp new file mode 100644 index 0000000..da9bd3a --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.11.ddl.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a socket feed with a client that pushes + * 10 records. The feed is connected to a dataset that is then + * queried for the data. + * Expected Res : Success + * Date : 24th Feb 2016 + */ + +drop dataverse test; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.2.update.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.2.update.sqlpp new file mode 100644 index 0000000..54a8a2c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.2.update.sqlpp @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +set `wait-for-completion-feed` "false"; + +connect feed TweetFeed to dataset Tweet; + +start feed TweetFeed; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.3.server.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.3.server.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.3.server.sqlpp new file mode 100644 index 0000000..22fbc4a --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.3.server.sqlpp @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a socket feed with a client that pushes + * 1000 records. The feed is connected to a dataset that is then + * queried for the data. + * Expected Res : Success + */ + +start client 10001 file-client 127.0.0.1 ../asterix-app/data/twitter/real.adm 1000 100 900 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.4.sleep.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.4.sleep.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.4.sleep.sqlpp new file mode 100644 index 0000000..c0e90c8 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.4.sleep.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +5000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.5.server.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.5.server.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.5.server.sqlpp new file mode 100644 index 0000000..aacaeaf --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.5.server.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + stop 10001 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.6.update.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.6.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.6.update.sqlpp new file mode 100644 index 0000000..2204437 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.6.update.sqlpp @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +upsert into Tweet ( +{ "create_at": datetime("2015-11-23T16:14:03.000Z"), + "id": 668945640186101761, + "text": "Just posted a photo @ Campus Martius Park https://t.co/5Ax4E2CdWZ", + "in_reply_to_status": -1, + "in_reply_to_user": -1, + "favorite_count": 0, + "coordinate": point("-83.04647491,42.33170228"), + "retweet_count": 0, + "lang": "en", + "is_retweet": false, + "user": { + "id": 48121888, "name": "Kevin McKague", "screen_name": "KevinOfMI", "lang": "en", "location": "Davison, Michigan", + "create_at": date("2009-06-17"), + "description": "I need", "followers_count": 1178, "friends_count": 1780, "statues_count": 22263 + }, + "place": { + "country": "United States", + "country_code": "United States", + "full_name": "Detroit, MI", + "id": "b463d3bd6064861b", + "name": "Detroit", "place_type": "city", + "bounding_box": rectangle("-83.288056,42.255085 -82.91052,42.450488") + }, + "geo_tag": { + "stateID": 0, "stateName": "Michigan", + "countyID": 26163, "countyName": "Wayne", + "cityID": 2622000, "cityName": "Detroit" + } +} +) +; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.7.server.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.7.server.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.7.server.sqlpp new file mode 100644 index 0000000..4c139d8 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.7.server.sqlpp @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Continue ingest 500 records + */ + +start client 10001 file-client 127.0.0.1 ../asterix-app/data/twitter/real.2.adm 500 50 1000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.8.sleep.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.8.sleep.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.8.sleep.sqlpp new file mode 100644 index 0000000..c0e90c8 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.8.sleep.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +5000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.9.server.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.9.server.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.9.server.sqlpp new file mode 100644 index 0000000..aacaeaf --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.9.server.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + stop 10001 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/delete/delete.1.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/delete/delete.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/delete/delete.1.adm new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/upsert/upsert.1.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/upsert/upsert.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/upsert/upsert.1.adm new file mode 100644 index 0000000..c227083 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/upsert/upsert.1.adm @@ -0,0 +1 @@ +0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-filter/intersection.1.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-filter/intersection.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-filter/intersection.1.adm new file mode 100644 index 0000000..44cc08a --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-filter/intersection.1.adm @@ -0,0 +1 @@ +{ "tweetid": 9, "user": { "screen-name": "NathanGiesen@211", "lang": "en", "friends_count": 39339, "statuses_count": 473, "name": "Nathan Giesen", "followers_count": 49416, "sender-location": point("36.86,74.62") }, "send-time": datetime("2012-07-21T10:10:00.000Z"), "referred-topics": {{ "verizon", "voicemail-service" }}, "message-text": " love verizon its voicemail-service is awesome" } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml index be6827f..e986046 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml @@ -3051,6 +3051,11 @@ </compilation-unit> </test-case> <test-case FilePath="index-selection"> + <compilation-unit name="intersection-with-filter"> + <output-dir compare="Text">intersection-with-filter</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="index-selection"> <compilation-unit name="intersection_with_nodegroup"> <output-dir compare="Text">intersection</output-dir> </compilation-unit> @@ -7229,6 +7234,16 @@ <output-dir compare="Text">nested-filter-equality-predicate</output-dir> </compilation-unit> </test-case> + <test-case FilePath="filters"> + <compilation-unit name="upsert"> + <output-dir compare="Text">upsert</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="filters"> + <compilation-unit name="delete"> + <output-dir compare="Text">delete</output-dir> + </compilation-unit> + </test-case> </test-group> <test-group name="json"> <test-case FilePath="json"> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 4574c66..6a6ea4b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -8511,6 +8511,16 @@ <output-dir compare="Text">nested-filter-equality-predicate</output-dir> </compilation-unit> </test-case> + <test-case FilePath="filters"> + <compilation-unit name="upsert"> + <output-dir compare="Text">upsert</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="filters"> + <compilation-unit name="delete"> + <output-dir compare="Text">delete</output-dir> + </compilation-unit> + </test-case> </test-group> <test-group name="json"> <test-case FilePath="json"> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java index 68c7e22..e2b1761 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java @@ -120,7 +120,7 @@ public class DatasetDataSource extends DataSource { int[] maxFilterFieldIndexes = createFilterIndexes(maxFilterVars, opSchema); return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, ((DatasetDataSource) dataSource).getDataset(), primaryIndex.getIndexName(), null, null, - true, true, minFilterFieldIndexes, maxFilterFieldIndexes); + true, true, false, minFilterFieldIndexes, maxFilterFieldIndexes); default: throw new AlgebricksException("Unknown datasource type"); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index e0cfc28..3b70ea9 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -427,8 +427,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields, - boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) - throws AlgebricksException { + boolean lowKeyInclusive, boolean highKeyInclusive, boolean propagateFilter, int[] minFilterFieldIndexes, + int[] maxFilterFieldIndexes) throws AlgebricksException { boolean isSecondary = true; try { Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), @@ -456,7 +456,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes, - maxFilterFieldIndexes, false); + maxFilterFieldIndexes, propagateFilter); } else { btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, @@ -473,7 +473,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec, List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName, - int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException { + int[] keyFields, boolean propagateFilter, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) + throws AlgebricksException { try { int numPrimaryKeys = dataset.getPrimaryKeys().size(); Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), @@ -498,7 +499,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> if (dataset.getDatasetType() == DatasetType.INTERNAL) { rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true, indexDataflowHelperFactory, retainInput, retainMissing, context.getMissingWriterFactory(), - searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes, false); + searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes, propagateFilter); } else { // Create the operator rtreeSearchOp = new ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java index 9cd7138..5d6f40c 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java @@ -36,6 +36,11 @@ public abstract class AbstractScanOperator extends AbstractLogicalOperator { return variables; } + public List<LogicalVariable> getScanVariables() { + return variables; + } + + public void setVariables(List<LogicalVariable> variables) { this.variables = variables; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java index 8a2981d..a8e6b44 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java @@ -35,12 +35,24 @@ public abstract class AbstractUnnestMapOperator extends AbstractUnnestOperator { protected List<LogicalVariable> minFilterVars; protected List<LogicalVariable> maxFilterVars; + protected boolean propagateIndexFilter; + public AbstractUnnestMapOperator(List<LogicalVariable> variables, Mutable<ILogicalExpression> expression, List<Object> variableTypes, boolean propagateInput) { super(variables, expression); this.expression = expression; this.variableTypes = variableTypes; this.propagateInput = propagateInput; + this.propagateIndexFilter = false; + } + + @Override + public List<LogicalVariable> getScanVariables() { + if (propagateIndexFilter) { + return variables.subList(0, variables.size() - 2); + } else { + return variables; + } } public List<Object> getVariableTypes() { @@ -98,4 +110,27 @@ public abstract class AbstractUnnestMapOperator extends AbstractUnnestOperator { return additionalFilteringExpressions; } + public void markPropagageIndexFilter() { + this.propagateIndexFilter = true; + } + + public boolean propagateIndexFilter() { + return this.propagateIndexFilter; + } + + public LogicalVariable getPropagateIndexMinFilterVar() { + if (propagateIndexFilter) { + return variables.get(variables.size() - 2); + } else { + return null; + } + } + + public LogicalVariable getPropagateIndexMaxFilterVar() { + if (propagateIndexFilter) { + return variables.get(variables.size() - 1); + } else { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java index e64be2b..114fde0 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java @@ -19,7 +19,9 @@ package org.apache.hyracks.algebricks.core.algebra.operators.logical; +import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; @@ -32,22 +34,45 @@ import org.apache.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvir import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; import org.apache.hyracks.algebricks.core.config.AlgebricksConfig; +import org.apache.hyracks.api.exceptions.ErrorCode; public class IntersectOperator extends AbstractLogicalOperator { private final List<List<LogicalVariable>> inputVars; + private final List<List<LogicalVariable>> compareVars; private final List<LogicalVariable> outputVars; + private List<List<LogicalVariable>> extraVars; - public IntersectOperator(List<LogicalVariable> outputVars, List<List<LogicalVariable>> inputVars) + public IntersectOperator(List<LogicalVariable> outputVars, List<List<LogicalVariable>> compareVars) throws AlgebricksException { - if (outputVars.size() != inputVars.get(0).size()) { - throw new AlgebricksException("The number of output variables is different with the input variable number"); + this(outputVars, compareVars, + compareVars.stream().map(vars -> new ArrayList<LogicalVariable>()).collect(Collectors.toList())); + } + + public IntersectOperator(List<LogicalVariable> outputVars, List<List<LogicalVariable>> compareVars, + List<List<LogicalVariable>> extraVars) throws AlgebricksException { + int numCompareFields = compareVars.get(0).size(); + if (compareVars.stream().anyMatch(vlist -> vlist.size() != numCompareFields)) { + throw AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER); + } + int numExtraFields = extraVars.get(0).size(); + if (extraVars.stream().anyMatch(vlist -> vlist.size() != numExtraFields)) { + throw AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER); + } + if (outputVars.size() != numCompareFields + numExtraFields) { + throw AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER); + } + + this.outputVars = new ArrayList<>(outputVars); + this.compareVars = new ArrayList<>(compareVars); + this.inputVars = new ArrayList<>(compareVars.size()); + for (List<LogicalVariable> vars : compareVars) { + this.inputVars.add(new ArrayList<>(vars)); } - if (inputVars.stream().anyMatch(vlist -> vlist.size() != outputVars.size())) { - throw new AlgebricksException("The schemas of input variables are not consistent"); + for (int i = 0; i < extraVars.size(); i++) { + this.inputVars.get(i).addAll(extraVars.get(i)); } - this.outputVars = outputVars; - this.inputVars = inputVars; + this.extraVars = extraVars; } @Override @@ -86,14 +111,20 @@ public class IntersectOperator extends AbstractLogicalOperator { IVariableTypeEnvironment typeEnv = ctx.getOutputTypeEnvironment(inputs.get(0).getValue()); for (int i = 1; i < inputs.size(); i++) { - checkTypeConsistency(typeEnv, inputVars.get(0), ctx.getOutputTypeEnvironment(inputs.get(i).getValue()), - inputVars.get(i)); + checkTypeConsistency(typeEnv, compareVars.get(0), ctx.getOutputTypeEnvironment(inputs.get(i).getValue()), + compareVars.get(i)); } - IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), - ctx.getMetadataProvider()); - for (int i = 0; i < outputVars.size(); i++) { - env.setVarType(outputVars.get(i), typeEnv.getVarType(inputVars.get(0).get(i))); + IVariableTypeEnvironment env = + new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMetadataProvider()); + int i = 0; + for (; i < compareVars.get(0).size(); i++) { + env.setVarType(outputVars.get(i), typeEnv.getVarType(compareVars.get(0).get(i))); + } + if (extraVars != null) { + for (int k = 0; k < extraVars.get(0).size(); k++) { + env.setVarType(outputVars.get(i + k), typeEnv.getVarType(extraVars.get(0).get(k))); + } } return typeEnv; } @@ -103,11 +134,19 @@ public class IntersectOperator extends AbstractLogicalOperator { } public int getNumInput() { - return inputVars.size(); + return compareVars.size(); + } + + public List<LogicalVariable> getCompareVariables(int inputIndex) { + return compareVars.get(inputIndex); + } + + public List<List<LogicalVariable>> getExtraVariables() { + return extraVars; } public List<LogicalVariable> getInputVariables(int inputIndex) { - return inputVars.get(inputIndex); + return this.inputVars.get(inputIndex); } private void checkTypeConsistency(IVariableTypeEnvironment expected, List<LogicalVariable> expectedVariables, @@ -116,8 +155,8 @@ public class IntersectOperator extends AbstractLogicalOperator { Object expectedType = expected.getVarType(expectedVariables.get(i)); Object actualType = actual.getVarType(actualVariables.get(i)); if (!expectedType.equals(actualType)) { - AlgebricksConfig.ALGEBRICKS_LOGGER - .warning("Type of two variables are not equal." + expectedVariables.get(i) + " is of type: " + AlgebricksConfig.ALGEBRICKS_LOGGER.warning( + "Type of two variables are not equal." + expectedVariables.get(i) + " is of type: " + expectedType + actualVariables.get(i) + " is of type: " + actualType); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java index 89e2423..e7fb6c0 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java @@ -51,7 +51,7 @@ public class UnnestMapOperator extends AbstractUnnestMapOperator { // this operator propagates all input variables. @Override public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException { - IVariableTypeEnvironment env = null; + IVariableTypeEnvironment env; if (propagateInput) { env = createPropagatingAllInputsTypeEnvironment(ctx); } else { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java index 5f43c3e..0baffc9 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java @@ -69,13 +69,13 @@ public class IntersectPOperator extends AbstractPhysicalOperator { for (int i = 0; i < intersectOp.getNumInput(); i++) { List<ILocalStructuralProperty> localProps = new ArrayList<>(); List<OrderColumn> orderColumns = new ArrayList<>(); - for (LogicalVariable column : intersectOp.getInputVariables(i)) { + for (LogicalVariable column : intersectOp.getCompareVariables(i)) { orderColumns.add(new OrderColumn(column, OrderOperator.IOrder.OrderKind.ASC)); } localProps.add(new LocalOrderProperty(orderColumns)); IPartitioningProperty pp = null; if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { - Set<LogicalVariable> partitioningVariables = new HashSet<>(intersectOp.getInputVariables(i)); + Set<LogicalVariable> partitioningVariables = new HashSet<>(intersectOp.getCompareVariables(i)); pp = new UnorderedPartitionedProperty(partitioningVariables, null); } pv[i] = new StructuralPropertiesVector(pp, localProps); @@ -108,37 +108,48 @@ public class IntersectPOperator extends AbstractPhysicalOperator { @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) - throws AlgebricksException { + throws AlgebricksException { // logical op should have checked all the mismatch issues. IntersectOperator logicalOp = (IntersectOperator) op; int nInput = logicalOp.getNumInput(); int[][] compareFields = new int[nInput][]; - IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories( - logicalOp.getInputVariables(0), context.getTypeEnvironment(op), context); + IBinaryComparatorFactory[] comparatorFactories = JobGenHelper + .variablesToAscBinaryComparatorFactories(logicalOp.getCompareVariables(0), + context.getTypeEnvironment(op), context); INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider(); INormalizedKeyComputerFactory nkcf = null; if (nkcfProvider != null) { - Object type = context.getTypeEnvironment(op).getVarType(logicalOp.getInputVariables(0).get(0)); + Object type = context.getTypeEnvironment(op).getVarType(logicalOp.getCompareVariables(0).get(0)); if (type != null) { nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, true); } } for (int i = 0; i < logicalOp.getNumInput(); i++) { - compareFields[i] = JobGenHelper.variablesToFieldIndexes(logicalOp.getInputVariables(i), inputSchemas[i]); + compareFields[i] = JobGenHelper.variablesToFieldIndexes(logicalOp.getCompareVariables(i), inputSchemas[i]); + } + + int[][] extraFields = null; + if (logicalOp.getExtraVariables() != null) { + extraFields = new int[logicalOp.getNumInput()][]; + for (int i = 0; i < logicalOp.getNumInput(); i++) { + extraFields[i] = + JobGenHelper.variablesToFieldIndexes(logicalOp.getExtraVariables().get(i), inputSchemas[i]); + } } IOperatorDescriptorRegistry spec = builder.getJobSpec(); - RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, - context); + RecordDescriptor recordDescriptor = + JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); - IntersectOperatorDescriptor opDescriptor = null; + IntersectOperatorDescriptor opDescriptor; try { - opDescriptor = new IntersectOperatorDescriptor(spec, nInput, compareFields, nkcf, comparatorFactories, - recordDescriptor); + opDescriptor = + new IntersectOperatorDescriptor(spec, nInput, compareFields, extraFields, nkcf, comparatorFactories, + recordDescriptor); } catch (HyracksException e) { throw new AlgebricksException(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java index fd7a8cb..5cc854c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java @@ -35,6 +35,7 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; @@ -47,56 +48,70 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePusha /** * This intersection operator is to get the common elements from multiple way inputs. - * It will only produce the projected fields which are used for comparison. + * It will produce the projected fields which are used for comparison and also the extra fields that could + * come with the record from any input */ public class IntersectOperatorDescriptor extends AbstractOperatorDescriptor { private static final long serialVersionUID = 1L; - private final int[][] projectFields; + private final int[][] compareFields; + private final int[][] extraFields; private final INormalizedKeyComputerFactory firstKeyNormalizerFactory; private final IBinaryComparatorFactory[] comparatorFactory; /** * @param spec * @param nInputs Number of inputs - * @param compareAndProjectFields The project field list of each input. + * @param compareFields The compare field list of each input. * All the fields order should be the same with the comparatorFactories + * @param extraFields Extra field that * @param firstKeyNormalizerFactory Normalizer for the first comparison key. * @param comparatorFactories A list of comparators for each field * @param recordDescriptor * @throws HyracksException */ - public IntersectOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInputs, int[][] compareAndProjectFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, - RecordDescriptor recordDescriptor) throws HyracksException { + public IntersectOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInputs, int[][] compareFields, + int[][] extraFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory, + IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) throws HyracksException { super(spec, nInputs, 1); outRecDescs[0] = recordDescriptor; - validateParameters(compareAndProjectFields, comparatorFactories); + validateParameters(compareFields, comparatorFactories, extraFields); - this.projectFields = compareAndProjectFields; + this.compareFields = compareFields; + this.extraFields = extraFields; this.firstKeyNormalizerFactory = firstKeyNormalizerFactory; this.comparatorFactory = comparatorFactories; } - private void validateParameters(int[][] compareAndProjectFields, IBinaryComparatorFactory[] comparatorFactories) - throws HyracksException { + private void validateParameters(int[][] compareFields, IBinaryComparatorFactory[] comparatorFactories, + int[][] extraFields) throws HyracksException { - int firstLength = compareAndProjectFields[0].length; - for (int[] fields : compareAndProjectFields) { + int firstLength = compareFields[0].length; + for (int[] fields : compareFields) { if (fields.length != firstLength) { - throw new HyracksException("The given input comparison fields is not equal"); + throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER); } for (int fid : fields) { if (fid < 0) { - throw new HyracksException("Invalid field index in given comparison fields array"); + throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER); } } } if (firstLength != comparatorFactories.length) { - throw new HyracksException("The size of given fields is not equal with the number of comparators"); + throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER); } + + if (extraFields != null) { + firstLength = extraFields[0].length; + for (int[] fields : extraFields) { + if (fields.length != firstLength) { + throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER); + } + } + } + } @Override @@ -125,7 +140,7 @@ public class IntersectOperatorDescriptor extends AbstractOperatorDescriptor { for (int i = 0; i < inputRecordDesc.length; i++) { inputRecordDesc[i] = recordDescProvider.getInputRecordDescriptor(getActivityId(), i); } - return new IntersectOperatorNodePushable(ctx, inputArity, inputRecordDesc, projectFields, + return new IntersectOperatorNodePushable(ctx, inputArity, inputRecordDesc, compareFields, extraFields, firstKeyNormalizerFactory, comparatorFactory); } } @@ -135,7 +150,8 @@ public class IntersectOperatorDescriptor extends AbstractOperatorDescriptor { private enum ACTION {FAILED, CLOSE} private final int inputArity; - private final int[][] projectFields; + private final int[][] compareFields; + private final int[][] allProjectFields; private final BitSet consumed; private final int[] tupleIndexMarker; private final FrameTupleAccessor[] refAccessor; @@ -147,16 +163,32 @@ public class IntersectOperatorDescriptor extends AbstractOperatorDescriptor { private boolean done = false; public IntersectOperatorNodePushable(IHyracksTaskContext ctx, int inputArity, - RecordDescriptor[] inputRecordDescriptors, int[][] projectFields, + RecordDescriptor[] inputRecordDescriptors, int[][] compareFields, int[][] extraFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactory) throws HyracksDataException { this.inputArity = inputArity; - this.projectFields = projectFields; + this.compareFields = compareFields; + + int[][] projectedFields = compareFields; + if (extraFields != null) { + projectedFields = new int[inputArity][]; + for (int input = 0; input < inputArity; input++) { + projectedFields[input] = new int[compareFields[input].length + extraFields[input].length]; + int j = 0; + for (; j < compareFields[input].length; j++) { + projectedFields[input][j] = compareFields[input][j]; + } + for (int k = 0; k < extraFields[input].length; k++) { + projectedFields[input][j + k] = extraFields[input][k]; + } + } + } + this.allProjectFields = projectedFields; this.firstKeyNormalizerComputer = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer(); - comparators = new IBinaryComparator[projectFields[0].length]; + comparators = new IBinaryComparator[compareFields[0].length]; for (int i = 0; i < comparators.length; i++) { comparators[i] = comparatorFactory[i].createBinaryComparator(); } @@ -241,8 +273,9 @@ public class IntersectOperatorDescriptor extends AbstractOperatorDescriptor { continue; } while (tupleIndexMarker[i] < refAccessor[i].getTupleCount()) { - int cmp = compare(i, refAccessor[i], tupleIndexMarker[i], maxInput, - refAccessor[maxInput], tupleIndexMarker[maxInput]); + int cmp = + compare(i, refAccessor[i], tupleIndexMarker[i], maxInput, refAccessor[maxInput], + tupleIndexMarker[maxInput]); if (cmp == 0) { match++; break; @@ -260,7 +293,7 @@ public class IntersectOperatorDescriptor extends AbstractOperatorDescriptor { } if (match == inputArity) { FrameUtils.appendProjectionToWriter(writer, appender, refAccessor[maxInput], - tupleIndexMarker[maxInput], projectFields[maxInput]); + tupleIndexMarker[maxInput], allProjectFields[maxInput]); for (int i = 0; i < inputArity; i++) { tupleIndexMarker[i]++; if (tupleIndexMarker[i] >= refAccessor[i].getTupleCount()) { @@ -291,11 +324,11 @@ public class IntersectOperatorDescriptor extends AbstractOperatorDescriptor { for (int i = 0; i < comparators.length; i++) { int cmp = comparators[i].compare(frameTupleAccessor1.getBuffer().array(), - frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, projectFields[input1][i]), - frameTupleAccessor1.getFieldLength(tid1, projectFields[input1][i]), + frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, compareFields[input1][i]), + frameTupleAccessor1.getFieldLength(tid1, compareFields[input1][i]), frameTupleAccessor2.getBuffer().array(), - frameTupleAccessor2.getAbsoluteFieldStartOffset(tid2, projectFields[input2][i]), - frameTupleAccessor2.getFieldLength(tid2, projectFields[input2][i])); + frameTupleAccessor2.getAbsoluteFieldStartOffset(tid2, compareFields[input2][i]), + frameTupleAccessor2.getFieldLength(tid2, compareFields[input2][i])); if (cmp != 0) { return cmp; @@ -308,8 +341,8 @@ public class IntersectOperatorDescriptor extends AbstractOperatorDescriptor { return firstKeyNormalizerComputer == null ? 0 : firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(), - frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, projectFields[inputId1][0]), - frameTupleAccessor1.getFieldLength(tid1, projectFields[inputId1][0])); + frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, compareFields[inputId1][0]), + frameTupleAccessor1.getFieldLength(tid1, compareFields[inputId1][0])); } private int findMaxInput() throws HyracksDataException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java index 0c49588..6729713 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java @@ -108,7 +108,7 @@ public class IntersectOperatorDescriptorTest { public void testNormalOperatorInitialization() throws HyracksException { IntersectOperatorDescriptor operatorDescriptor = new IntersectOperatorDescriptor(mockRegistry, nInputs, - compareFields, normalizedKeyFactory, comparatorFactory, outRecordDescriptor); + compareFields, null, normalizedKeyFactory, comparatorFactory, outRecordDescriptor); assertEquals(nInputs, operatorDescriptor.getInputArity()); } @@ -158,7 +158,7 @@ public class IntersectOperatorDescriptorTest { private void executeAndVerifyResult(List<IFrame>[] inputFrames, List<Object[]> answer) throws Exception { IntersectOperatorDescriptor.IntersectOperatorNodePushable pushable = new IntersectOperatorDescriptor.IntersectOperatorNodePushable(ctx, nInputs, inputRecordDescriptor, - compareFields, null, comparatorFactory); + compareFields, null, null, comparatorFactory); assertEquals(nInputs, pushable.getInputArity()); IFrameWriter[] writers = new IFrameWriter[nInputs];
