Repository: asterixdb
Updated Branches:
  refs/heads/master 8cbb05cec -> de0ece7f1


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.5.server.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.5.server.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.5.server.sqlpp
new file mode 100644
index 0000000..aacaeaf
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.5.server.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ stop 10001

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.6.update.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.6.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.6.update.sqlpp
new file mode 100644
index 0000000..cc12612
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.6.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+DELETE FROM Tweet t WHERE t.id = 668945640186101761;
+

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.7.server.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.7.server.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.7.server.sqlpp
new file mode 100644
index 0000000..9405846
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.7.server.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Continue ingest 10,000 records
+ */
+
+start client 10001 file-client 127.0.0.1 
../asterix-app/data/twitter/real.2.adm 500 50 1000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.8.sleep.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.8.sleep.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.8.sleep.sqlpp
new file mode 100644
index 0000000..c0e90c8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.8.sleep.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+5000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.9.server.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.9.server.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.9.server.sqlpp
new file mode 100644
index 0000000..aacaeaf
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/delete/delete.9.server.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ stop 10001

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.1.ddl.sqlpp
new file mode 100644
index 0000000..417f841
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.1.ddl.sqlpp
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * This test case verify if the filter optimization rule is still correct when 
there
+ * is one upsert on some of the component. The new value should be returned.
+ *
+ * 1. create the dataset Tweet that ingested by a feed.
+ * 2. update one tweet
+ * 3. start the feed again to make the previous component flush to disk
+ * 4. send the query by the old value to see if any record returns
+ */
+
+
+drop dataverse test if exists;
+create dataverse test if not exists;
+use test;
+
+create type typeUser if not exists as open {
+    id: int64,
+    name: string,
+    screen_name : string,
+    lang : string,
+    location: string,
+    create_at: date,
+    description: string,
+    followers_count: int32,
+    friends_count: int32,
+    statues_count: int64
+}
+
+create type typePlace if not exists as open{
+    country : string,
+    country_code : string,
+    full_name : string,
+    id : string,
+    name : string,
+    place_type : string,
+    bounding_box : rectangle
+}
+
+create type typeGeoTag if not exists as open {
+    stateID: int32,
+    stateName: string,
+    countyID: int32,
+    countyName: string,
+    cityID: int32?,
+    cityName: string?
+}
+
+create type typeTweet if not exists as open{
+    create_at : datetime,
+    id: int64,
+    `text`: string,
+    in_reply_to_status : int64,
+    in_reply_to_user : int64,
+    favorite_count : int64,
+    coordinate: point?,
+    retweet_count : int64,
+    lang : string,
+    is_retweet: boolean,
+    hashtags : {{ string }} ?,
+    user_mentions : {{ int64 }} ? ,
+    user : typeUser,
+    place : typePlace?,
+    geo_tag: typeGeoTag
+}
+
+create dataset Tweet(typeTweet) primary key id
+using compaction policy prefix 
(("max-mergable-component-size"="32768"),("max-tolerance-component-count"="32"))
+with filter on create_at;
+
+create index text_idx if not exists on Tweet(`text`) type btree;
+create index state_idx if not exists on Tweet(geo_tag.stateID) type btree;
+
+create feed TweetFeed using socket_adapter
+(
+    ("sockets"="127.0.0.1:10001"),
+    ("address-type"="IP"),
+    ("type-name"="typeTweet"),
+    ("format"="adm")
+);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.10.query.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.10.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.10.query.sqlpp
new file mode 100644
index 0000000..4846e8c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.10.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+SELECT value m.geo_tag.stateID from Tweet m
+WHERE m.`text` = "Just posted a photo @ Campus Martius Park 
https://t.co/5Ax4E2CdWZ";
+;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.11.ddl.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.11.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.11.ddl.sqlpp
new file mode 100644
index 0000000..da9bd3a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.11.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+drop dataverse test;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.2.update.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.2.update.sqlpp
new file mode 100644
index 0000000..54a8a2c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.2.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+set `wait-for-completion-feed` "false";
+
+connect feed TweetFeed to dataset Tweet;
+
+start feed TweetFeed;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.3.server.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.3.server.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.3.server.sqlpp
new file mode 100644
index 0000000..22fbc4a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.3.server.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 1000 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ */
+
+start client 10001 file-client 127.0.0.1 ../asterix-app/data/twitter/real.adm 
1000 100 900

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.4.sleep.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.4.sleep.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.4.sleep.sqlpp
new file mode 100644
index 0000000..c0e90c8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.4.sleep.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+5000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.5.server.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.5.server.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.5.server.sqlpp
new file mode 100644
index 0000000..aacaeaf
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.5.server.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ stop 10001

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.6.update.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.6.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.6.update.sqlpp
new file mode 100644
index 0000000..2204437
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.6.update.sqlpp
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+upsert into Tweet (
+{ "create_at": datetime("2015-11-23T16:14:03.000Z"),
+  "id": 668945640186101761,
+  "text": "Just posted a photo @ Campus Martius Park https://t.co/5Ax4E2CdWZ";,
+  "in_reply_to_status": -1,
+  "in_reply_to_user": -1,
+  "favorite_count": 0,
+  "coordinate": point("-83.04647491,42.33170228"),
+  "retweet_count": 0,
+  "lang": "en",
+  "is_retweet": false,
+  "user": {
+    "id": 48121888, "name": "Kevin McKague", "screen_name": "KevinOfMI", 
"lang": "en", "location": "Davison, Michigan",
+    "create_at": date("2009-06-17"),
+    "description": "I need", "followers_count": 1178, "friends_count": 1780, 
"statues_count": 22263
+  },
+  "place": {
+    "country": "United States",
+    "country_code": "United States",
+    "full_name": "Detroit, MI",
+    "id": "b463d3bd6064861b",
+    "name": "Detroit", "place_type": "city",
+    "bounding_box": rectangle("-83.288056,42.255085 -82.91052,42.450488")
+  },
+  "geo_tag": {
+    "stateID": 0, "stateName": "Michigan",
+    "countyID": 26163, "countyName": "Wayne",
+    "cityID": 2622000, "cityName": "Detroit"
+  }
+}
+)
+;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.7.server.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.7.server.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.7.server.sqlpp
new file mode 100644
index 0000000..4c139d8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.7.server.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Continue ingest 500 records
+ */
+
+start client 10001 file-client 127.0.0.1 
../asterix-app/data/twitter/real.2.adm 500 50 1000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.8.sleep.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.8.sleep.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.8.sleep.sqlpp
new file mode 100644
index 0000000..c0e90c8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.8.sleep.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+5000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.9.server.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.9.server.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.9.server.sqlpp
new file mode 100644
index 0000000..aacaeaf
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/filters/upsert/upsert.9.server.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ stop 10001

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/delete/delete.1.adm
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/delete/delete.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/delete/delete.1.adm
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/upsert/upsert.1.adm
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/upsert/upsert.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/upsert/upsert.1.adm
new file mode 100644
index 0000000..c227083
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/filters/upsert/upsert.1.adm
@@ -0,0 +1 @@
+0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-filter/intersection.1.adm
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-filter/intersection.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-filter/intersection.1.adm
new file mode 100644
index 0000000..44cc08a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-filter/intersection.1.adm
@@ -0,0 +1 @@
+{ "tweetid": 9, "user": { "screen-name": "NathanGiesen@211", "lang": "en", 
"friends_count": 39339, "statuses_count": 473, "name": "Nathan Giesen", 
"followers_count": 49416, "sender-location": point("36.86,74.62") }, 
"send-time": datetime("2012-07-21T10:10:00.000Z"), "referred-topics": {{ 
"verizon", "voicemail-service" }}, "message-text": " love verizon its 
voicemail-service is awesome" }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index be6827f..e986046 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -3051,6 +3051,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="index-selection">
+      <compilation-unit name="intersection-with-filter">
+        <output-dir compare="Text">intersection-with-filter</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="index-selection">
       <compilation-unit name="intersection_with_nodegroup">
         <output-dir compare="Text">intersection</output-dir>
       </compilation-unit>
@@ -7229,6 +7234,16 @@
         <output-dir 
compare="Text">nested-filter-equality-predicate</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="filters">
+      <compilation-unit name="upsert">
+        <output-dir compare="Text">upsert</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="filters">
+      <compilation-unit name="delete">
+        <output-dir compare="Text">delete</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="json">
     <test-case FilePath="json">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 4574c66..6a6ea4b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8511,6 +8511,16 @@
         <output-dir 
compare="Text">nested-filter-equality-predicate</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="filters">
+      <compilation-unit name="upsert">
+        <output-dir compare="Text">upsert</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="filters">
+      <compilation-unit name="delete">
+        <output-dir compare="Text">delete</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="json">
     <test-case FilePath="json">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 68c7e22..e2b1761 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -120,7 +120,7 @@ public class DatasetDataSource extends DataSource {
                 int[] maxFilterFieldIndexes = 
createFilterIndexes(maxFilterVars, opSchema);
                 return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, 
typeEnv, context, true,
                         false, ((DatasetDataSource) dataSource).getDataset(), 
primaryIndex.getIndexName(), null, null,
-                        true, true, minFilterFieldIndexes, 
maxFilterFieldIndexes);
+                        true, true, false, minFilterFieldIndexes, 
maxFilterFieldIndexes);
             default:
                 throw new AlgebricksException("Unknown datasource type");
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index e0cfc28..3b70ea9 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -427,8 +427,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildBtreeRuntime(JobSpecification jobSpec,
             IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, 
JobGenContext context, boolean retainInput,
             boolean retainMissing, Dataset dataset, String indexName, int[] 
lowKeyFields, int[] highKeyFields,
-            boolean lowKeyInclusive, boolean highKeyInclusive, int[] 
minFilterFieldIndexes, int[] maxFilterFieldIndexes)
-            throws AlgebricksException {
+            boolean lowKeyInclusive, boolean highKeyInclusive, boolean 
propagateFilter, int[] minFilterFieldIndexes,
+            int[] maxFilterFieldIndexes) throws AlgebricksException {
         boolean isSecondary = true;
         try {
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
@@ -456,7 +456,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc, lowKeyFields, highKeyFields,
                         lowKeyInclusive, highKeyInclusive, indexHelperFactory, 
retainInput, retainMissing,
                         context.getMissingWriterFactory(), 
searchCallbackFactory, minFilterFieldIndexes,
-                        maxFilterFieldIndexes, false);
+                        maxFilterFieldIndexes, propagateFilter);
             } else {
                 btreeSearchOp = new 
ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
                         highKeyFields, lowKeyInclusive, highKeyInclusive, 
indexHelperFactory, retainInput,
@@ -473,7 +473,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildRtreeRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, 
IVariableTypeEnvironment typeEnv,
             JobGenContext context, boolean retainInput, boolean retainMissing, 
Dataset dataset, String indexName,
-            int[] keyFields, int[] minFilterFieldIndexes, int[] 
maxFilterFieldIndexes) throws AlgebricksException {
+            int[] keyFields, boolean propagateFilter, int[] 
minFilterFieldIndexes, int[] maxFilterFieldIndexes)
+            throws AlgebricksException {
         try {
             int numPrimaryKeys = dataset.getPrimaryKeys().size();
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
@@ -498,7 +499,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             if (dataset.getDatasetType() == DatasetType.INTERNAL) {
                 rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc, keyFields, true, true,
                         indexDataflowHelperFactory, retainInput, 
retainMissing, context.getMissingWriterFactory(),
-                        searchCallbackFactory, minFilterFieldIndexes, 
maxFilterFieldIndexes, false);
+                        searchCallbackFactory, minFilterFieldIndexes, 
maxFilterFieldIndexes, propagateFilter);
             } else {
                 // Create the operator
                 rtreeSearchOp = new 
ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, 
true,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
index 9cd7138..5d6f40c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
@@ -36,6 +36,11 @@ public abstract class AbstractScanOperator extends 
AbstractLogicalOperator {
         return variables;
     }
 
+    public List<LogicalVariable> getScanVariables() {
+        return variables;
+    }
+
+
     public void setVariables(List<LogicalVariable> variables) {
         this.variables = variables;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java
index 8a2981d..a8e6b44 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestMapOperator.java
@@ -35,12 +35,24 @@ public abstract class AbstractUnnestMapOperator extends 
AbstractUnnestOperator {
     protected List<LogicalVariable> minFilterVars;
     protected List<LogicalVariable> maxFilterVars;
 
+    protected boolean propagateIndexFilter;
+
     public AbstractUnnestMapOperator(List<LogicalVariable> variables, 
Mutable<ILogicalExpression> expression,
             List<Object> variableTypes, boolean propagateInput) {
         super(variables, expression);
         this.expression = expression;
         this.variableTypes = variableTypes;
         this.propagateInput = propagateInput;
+        this.propagateIndexFilter = false;
+    }
+
+    @Override
+    public List<LogicalVariable> getScanVariables() {
+        if (propagateIndexFilter) {
+            return variables.subList(0, variables.size() - 2);
+        } else {
+            return variables;
+        }
     }
 
     public List<Object> getVariableTypes() {
@@ -98,4 +110,27 @@ public abstract class AbstractUnnestMapOperator extends 
AbstractUnnestOperator {
         return additionalFilteringExpressions;
     }
 
+    public void markPropagageIndexFilter() {
+        this.propagateIndexFilter = true;
+    }
+
+    public boolean propagateIndexFilter() {
+        return this.propagateIndexFilter;
+    }
+
+    public LogicalVariable getPropagateIndexMinFilterVar() {
+        if (propagateIndexFilter) {
+            return variables.get(variables.size() - 2);
+        } else {
+            return null;
+        }
+    }
+
+    public LogicalVariable getPropagateIndexMaxFilterVar() {
+        if (propagateIndexFilter) {
+            return variables.get(variables.size() - 1);
+        } else {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
index e64be2b..114fde0 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
@@ -19,7 +19,9 @@
 
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -32,22 +34,45 @@ import 
org.apache.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvir
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 
 public class IntersectOperator extends AbstractLogicalOperator {
 
     private final List<List<LogicalVariable>> inputVars;
+    private final List<List<LogicalVariable>> compareVars;
     private final List<LogicalVariable> outputVars;
+    private List<List<LogicalVariable>> extraVars;
 
-    public IntersectOperator(List<LogicalVariable> outputVars, 
List<List<LogicalVariable>> inputVars)
+    public IntersectOperator(List<LogicalVariable> outputVars, 
List<List<LogicalVariable>> compareVars)
             throws AlgebricksException {
-        if (outputVars.size() != inputVars.get(0).size()) {
-            throw new AlgebricksException("The number of output variables is 
different with the input variable number");
+        this(outputVars, compareVars,
+                compareVars.stream().map(vars -> new 
ArrayList<LogicalVariable>()).collect(Collectors.toList()));
+    }
+
+    public IntersectOperator(List<LogicalVariable> outputVars, 
List<List<LogicalVariable>> compareVars,
+            List<List<LogicalVariable>> extraVars) throws AlgebricksException {
+        int numCompareFields = compareVars.get(0).size();
+        if (compareVars.stream().anyMatch(vlist -> vlist.size() != 
numCompareFields)) {
+            throw 
AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+        }
+        int numExtraFields = extraVars.get(0).size();
+        if (extraVars.stream().anyMatch(vlist -> vlist.size() != 
numExtraFields)) {
+            throw 
AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+        }
+        if (outputVars.size() != numCompareFields + numExtraFields) {
+            throw 
AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+        }
+
+        this.outputVars = new ArrayList<>(outputVars);
+        this.compareVars = new ArrayList<>(compareVars);
+        this.inputVars = new ArrayList<>(compareVars.size());
+        for (List<LogicalVariable> vars : compareVars) {
+            this.inputVars.add(new ArrayList<>(vars));
         }
-        if (inputVars.stream().anyMatch(vlist -> vlist.size() != 
outputVars.size())) {
-            throw new AlgebricksException("The schemas of input variables are 
not consistent");
+        for (int i = 0; i < extraVars.size(); i++) {
+            this.inputVars.get(i).addAll(extraVars.get(i));
         }
-        this.outputVars = outputVars;
-        this.inputVars = inputVars;
+        this.extraVars = extraVars;
     }
 
     @Override
@@ -86,14 +111,20 @@ public class IntersectOperator extends 
AbstractLogicalOperator {
         IVariableTypeEnvironment typeEnv = 
ctx.getOutputTypeEnvironment(inputs.get(0).getValue());
 
         for (int i = 1; i < inputs.size(); i++) {
-            checkTypeConsistency(typeEnv, inputVars.get(0), 
ctx.getOutputTypeEnvironment(inputs.get(i).getValue()),
-                    inputVars.get(i));
+            checkTypeConsistency(typeEnv, compareVars.get(0), 
ctx.getOutputTypeEnvironment(inputs.get(i).getValue()),
+                    compareVars.get(i));
         }
 
-        IVariableTypeEnvironment env = new 
NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
-                ctx.getMetadataProvider());
-        for (int i = 0; i < outputVars.size(); i++) {
-            env.setVarType(outputVars.get(i), 
typeEnv.getVarType(inputVars.get(0).get(i)));
+        IVariableTypeEnvironment env =
+                new 
NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), 
ctx.getMetadataProvider());
+        int i = 0;
+        for (; i < compareVars.get(0).size(); i++) {
+            env.setVarType(outputVars.get(i), 
typeEnv.getVarType(compareVars.get(0).get(i)));
+        }
+        if (extraVars != null) {
+            for (int k = 0; k < extraVars.get(0).size(); k++) {
+                env.setVarType(outputVars.get(i + k), 
typeEnv.getVarType(extraVars.get(0).get(k)));
+            }
         }
         return typeEnv;
     }
@@ -103,11 +134,19 @@ public class IntersectOperator extends 
AbstractLogicalOperator {
     }
 
     public int getNumInput() {
-        return inputVars.size();
+        return compareVars.size();
+    }
+
+    public List<LogicalVariable> getCompareVariables(int inputIndex) {
+        return compareVars.get(inputIndex);
+    }
+
+    public List<List<LogicalVariable>> getExtraVariables() {
+        return extraVars;
     }
 
     public List<LogicalVariable> getInputVariables(int inputIndex) {
-        return inputVars.get(inputIndex);
+        return this.inputVars.get(inputIndex);
     }
 
     private void checkTypeConsistency(IVariableTypeEnvironment expected, 
List<LogicalVariable> expectedVariables,
@@ -116,8 +155,8 @@ public class IntersectOperator extends 
AbstractLogicalOperator {
             Object expectedType = 
expected.getVarType(expectedVariables.get(i));
             Object actualType = actual.getVarType(actualVariables.get(i));
             if (!expectedType.equals(actualType)) {
-                AlgebricksConfig.ALGEBRICKS_LOGGER
-                        .warning("Type of two variables are not equal." + 
expectedVariables.get(i) + " is of type: "
+                AlgebricksConfig.ALGEBRICKS_LOGGER.warning(
+                        "Type of two variables are not equal." + 
expectedVariables.get(i) + " is of type: "
                                 + expectedType + actualVariables.get(i) + " is 
of type: " + actualType);
             }
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
index 89e2423..e7fb6c0 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
@@ -51,7 +51,7 @@ public class UnnestMapOperator extends 
AbstractUnnestMapOperator {
     // this operator propagates all input variables.
     @Override
     public IVariableTypeEnvironment 
computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        IVariableTypeEnvironment env = null;
+        IVariableTypeEnvironment env;
         if (propagateInput) {
             env = createPropagatingAllInputsTypeEnvironment(ctx);
         } else {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index 5f43c3e..0baffc9 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -69,13 +69,13 @@ public class IntersectPOperator extends 
AbstractPhysicalOperator {
         for (int i = 0; i < intersectOp.getNumInput(); i++) {
             List<ILocalStructuralProperty> localProps = new ArrayList<>();
             List<OrderColumn> orderColumns = new ArrayList<>();
-            for (LogicalVariable column : intersectOp.getInputVariables(i)) {
+            for (LogicalVariable column : intersectOp.getCompareVariables(i)) {
                 orderColumns.add(new OrderColumn(column, 
OrderOperator.IOrder.OrderKind.ASC));
             }
             localProps.add(new LocalOrderProperty(orderColumns));
             IPartitioningProperty pp = null;
             if (intersectOp.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
-                Set<LogicalVariable> partitioningVariables = new 
HashSet<>(intersectOp.getInputVariables(i));
+                Set<LogicalVariable> partitioningVariables = new 
HashSet<>(intersectOp.getCompareVariables(i));
                 pp = new UnorderedPartitionedProperty(partitioningVariables, 
null);
             }
             pv[i] = new StructuralPropertiesVector(pp, localProps);
@@ -108,37 +108,48 @@ public class IntersectPOperator extends 
AbstractPhysicalOperator {
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         // logical op should have checked all the mismatch issues.
         IntersectOperator logicalOp = (IntersectOperator) op;
         int nInput = logicalOp.getNumInput();
         int[][] compareFields = new int[nInput][];
 
-        IBinaryComparatorFactory[] comparatorFactories = 
JobGenHelper.variablesToAscBinaryComparatorFactories(
-                logicalOp.getInputVariables(0), 
context.getTypeEnvironment(op), context);
+        IBinaryComparatorFactory[] comparatorFactories = JobGenHelper
+                
.variablesToAscBinaryComparatorFactories(logicalOp.getCompareVariables(0),
+                        context.getTypeEnvironment(op), context);
 
         INormalizedKeyComputerFactoryProvider nkcfProvider = 
context.getNormalizedKeyComputerFactoryProvider();
         INormalizedKeyComputerFactory nkcf = null;
 
         if (nkcfProvider != null) {
-            Object type = 
context.getTypeEnvironment(op).getVarType(logicalOp.getInputVariables(0).get(0));
+            Object type = 
context.getTypeEnvironment(op).getVarType(logicalOp.getCompareVariables(0).get(0));
             if (type != null) {
                 nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, 
true);
             }
         }
 
         for (int i = 0; i < logicalOp.getNumInput(); i++) {
-            compareFields[i] = 
JobGenHelper.variablesToFieldIndexes(logicalOp.getInputVariables(i), 
inputSchemas[i]);
+            compareFields[i] = 
JobGenHelper.variablesToFieldIndexes(logicalOp.getCompareVariables(i), 
inputSchemas[i]);
+        }
+
+        int[][] extraFields = null;
+        if (logicalOp.getExtraVariables() != null) {
+            extraFields = new int[logicalOp.getNumInput()][];
+            for (int i = 0; i < logicalOp.getNumInput(); i++) {
+                extraFields[i] =
+                        
JobGenHelper.variablesToFieldIndexes(logicalOp.getExtraVariables().get(i), 
inputSchemas[i]);
+            }
         }
 
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        RecordDescriptor recordDescriptor = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
-                context);
+        RecordDescriptor recordDescriptor =
+                
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
 
-        IntersectOperatorDescriptor opDescriptor = null;
+        IntersectOperatorDescriptor opDescriptor;
         try {
-            opDescriptor = new IntersectOperatorDescriptor(spec, nInput, 
compareFields, nkcf, comparatorFactories,
-                    recordDescriptor);
+            opDescriptor =
+                    new IntersectOperatorDescriptor(spec, nInput, 
compareFields, extraFields, nkcf, comparatorFactories,
+                            recordDescriptor);
         } catch (HyracksException e) {
             throw new AlgebricksException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
index fd7a8cb..5cc854c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
@@ -35,6 +35,7 @@ import 
org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -47,56 +48,70 @@ import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePusha
 
 /**
  * This intersection operator is to get the common elements from multiple way 
inputs.
- * It will only produce the projected fields which are used for comparison.
+ * It will produce the projected fields which are used for comparison and also 
the extra fields that could
+ * come with the record from any input
  */
 public class IntersectOperatorDescriptor extends AbstractOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final int[][] projectFields;
+    private final int[][] compareFields;
+    private final int[][] extraFields;
     private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
     private final IBinaryComparatorFactory[] comparatorFactory;
 
     /**
      * @param spec
      * @param nInputs                   Number of inputs
-     * @param compareAndProjectFields   The project field list of each input.
+     * @param compareFields             The compare field list of each input.
      *                                  All the fields order should be the 
same with the comparatorFactories
+     * @param extraFields               Extra field that
      * @param firstKeyNormalizerFactory Normalizer for the first comparison 
key.
      * @param comparatorFactories       A list of comparators for each field
      * @param recordDescriptor
      * @throws HyracksException
      */
-    public IntersectOperatorDescriptor(IOperatorDescriptorRegistry spec, int 
nInputs, int[][] compareAndProjectFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor) throws HyracksException {
+    public IntersectOperatorDescriptor(IOperatorDescriptorRegistry spec, int 
nInputs, int[][] compareFields,
+            int[][] extraFields, INormalizedKeyComputerFactory 
firstKeyNormalizerFactory,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor 
recordDescriptor) throws HyracksException {
         super(spec, nInputs, 1);
         outRecDescs[0] = recordDescriptor;
 
-        validateParameters(compareAndProjectFields, comparatorFactories);
+        validateParameters(compareFields, comparatorFactories, extraFields);
 
-        this.projectFields = compareAndProjectFields;
+        this.compareFields = compareFields;
+        this.extraFields = extraFields;
         this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
         this.comparatorFactory = comparatorFactories;
     }
 
-    private void validateParameters(int[][] compareAndProjectFields, 
IBinaryComparatorFactory[] comparatorFactories)
-            throws HyracksException {
+    private void validateParameters(int[][] compareFields, 
IBinaryComparatorFactory[] comparatorFactories,
+            int[][] extraFields) throws HyracksException {
 
-        int firstLength = compareAndProjectFields[0].length;
-        for (int[] fields : compareAndProjectFields) {
+        int firstLength = compareFields[0].length;
+        for (int[] fields : compareFields) {
             if (fields.length != firstLength) {
-                throw new HyracksException("The given input comparison fields 
is not equal");
+                throw 
HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
             }
             for (int fid : fields) {
                 if (fid < 0) {
-                    throw new HyracksException("Invalid field index in given 
comparison fields array");
+                    throw 
HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
                 }
             }
         }
 
         if (firstLength != comparatorFactories.length) {
-            throw new HyracksException("The size of given fields is not equal 
with the number of comparators");
+            throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
         }
+
+        if (extraFields != null) {
+            firstLength = extraFields[0].length;
+            for (int[] fields : extraFields) {
+                if (fields.length != firstLength) {
+                    throw 
HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+                }
+            }
+        }
+
     }
 
     @Override
@@ -125,7 +140,7 @@ public class IntersectOperatorDescriptor extends 
AbstractOperatorDescriptor {
             for (int i = 0; i < inputRecordDesc.length; i++) {
                 inputRecordDesc[i] = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), i);
             }
-            return new IntersectOperatorNodePushable(ctx, inputArity, 
inputRecordDesc, projectFields,
+            return new IntersectOperatorNodePushable(ctx, inputArity, 
inputRecordDesc, compareFields, extraFields,
                     firstKeyNormalizerFactory, comparatorFactory);
         }
     }
@@ -135,7 +150,8 @@ public class IntersectOperatorDescriptor extends 
AbstractOperatorDescriptor {
         private enum ACTION {FAILED, CLOSE}
 
         private final int inputArity;
-        private final int[][] projectFields;
+        private final int[][] compareFields;
+        private final int[][] allProjectFields;
         private final BitSet consumed;
         private final int[] tupleIndexMarker;
         private final FrameTupleAccessor[] refAccessor;
@@ -147,16 +163,32 @@ public class IntersectOperatorDescriptor extends 
AbstractOperatorDescriptor {
         private boolean done = false;
 
         public IntersectOperatorNodePushable(IHyracksTaskContext ctx, int 
inputArity,
-                RecordDescriptor[] inputRecordDescriptors, int[][] 
projectFields,
+                RecordDescriptor[] inputRecordDescriptors, int[][] 
compareFields, int[][] extraFields,
                 INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactory)
                 throws HyracksDataException {
 
             this.inputArity = inputArity;
-            this.projectFields = projectFields;
+            this.compareFields = compareFields;
+
+            int[][] projectedFields = compareFields;
+            if (extraFields != null) {
+                projectedFields = new int[inputArity][];
+                for (int input = 0; input < inputArity; input++) {
+                    projectedFields[input] = new 
int[compareFields[input].length + extraFields[input].length];
+                    int j = 0;
+                    for (; j < compareFields[input].length; j++) {
+                        projectedFields[input][j] = compareFields[input][j];
+                    }
+                    for (int k = 0; k < extraFields[input].length; k++) {
+                        projectedFields[input][j + k] = extraFields[input][k];
+                    }
+                }
+            }
+            this.allProjectFields = projectedFields;
             this.firstKeyNormalizerComputer =
                     firstKeyNormalizerFactory == null ? null : 
firstKeyNormalizerFactory.createNormalizedKeyComputer();
 
-            comparators = new IBinaryComparator[projectFields[0].length];
+            comparators = new IBinaryComparator[compareFields[0].length];
             for (int i = 0; i < comparators.length; i++) {
                 comparators[i] = comparatorFactory[i].createBinaryComparator();
             }
@@ -241,8 +273,9 @@ public class IntersectOperatorDescriptor extends 
AbstractOperatorDescriptor {
                                 continue;
                             }
                             while (tupleIndexMarker[i] < 
refAccessor[i].getTupleCount()) {
-                                int cmp = compare(i, refAccessor[i], 
tupleIndexMarker[i], maxInput,
-                                        refAccessor[maxInput], 
tupleIndexMarker[maxInput]);
+                                int cmp =
+                                        compare(i, refAccessor[i], 
tupleIndexMarker[i], maxInput, refAccessor[maxInput],
+                                                tupleIndexMarker[maxInput]);
                                 if (cmp == 0) {
                                     match++;
                                     break;
@@ -260,7 +293,7 @@ public class IntersectOperatorDescriptor extends 
AbstractOperatorDescriptor {
                         }
                         if (match == inputArity) {
                             FrameUtils.appendProjectionToWriter(writer, 
appender, refAccessor[maxInput],
-                                    tupleIndexMarker[maxInput], 
projectFields[maxInput]);
+                                    tupleIndexMarker[maxInput], 
allProjectFields[maxInput]);
                             for (int i = 0; i < inputArity; i++) {
                                 tupleIndexMarker[i]++;
                                 if (tupleIndexMarker[i] >= 
refAccessor[i].getTupleCount()) {
@@ -291,11 +324,11 @@ public class IntersectOperatorDescriptor extends 
AbstractOperatorDescriptor {
 
                     for (int i = 0; i < comparators.length; i++) {
                         int cmp = 
comparators[i].compare(frameTupleAccessor1.getBuffer().array(),
-                                
frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, projectFields[input1][i]),
-                                frameTupleAccessor1.getFieldLength(tid1, 
projectFields[input1][i]),
+                                
frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, compareFields[input1][i]),
+                                frameTupleAccessor1.getFieldLength(tid1, 
compareFields[input1][i]),
                                 frameTupleAccessor2.getBuffer().array(),
-                                
frameTupleAccessor2.getAbsoluteFieldStartOffset(tid2, projectFields[input2][i]),
-                                frameTupleAccessor2.getFieldLength(tid2, 
projectFields[input2][i]));
+                                
frameTupleAccessor2.getAbsoluteFieldStartOffset(tid2, compareFields[input2][i]),
+                                frameTupleAccessor2.getFieldLength(tid2, 
compareFields[input2][i]));
 
                         if (cmp != 0) {
                             return cmp;
@@ -308,8 +341,8 @@ public class IntersectOperatorDescriptor extends 
AbstractOperatorDescriptor {
                     return firstKeyNormalizerComputer == null ?
                             0 :
                             
firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(),
-                                    
frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, 
projectFields[inputId1][0]),
-                                    frameTupleAccessor1.getFieldLength(tid1, 
projectFields[inputId1][0]));
+                                    
frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, 
compareFields[inputId1][0]),
+                                    frameTupleAccessor1.getFieldLength(tid1, 
compareFields[inputId1][0]));
                 }
 
                 private int findMaxInput() throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java
index 0c49588..6729713 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java
@@ -108,7 +108,7 @@ public class IntersectOperatorDescriptorTest {
     public void testNormalOperatorInitialization() throws HyracksException {
 
         IntersectOperatorDescriptor operatorDescriptor = new 
IntersectOperatorDescriptor(mockRegistry, nInputs,
-                compareFields, normalizedKeyFactory, comparatorFactory, 
outRecordDescriptor);
+                compareFields, null, normalizedKeyFactory, comparatorFactory, 
outRecordDescriptor);
 
         assertEquals(nInputs, operatorDescriptor.getInputArity());
     }
@@ -158,7 +158,7 @@ public class IntersectOperatorDescriptorTest {
     private void executeAndVerifyResult(List<IFrame>[] inputFrames, 
List<Object[]> answer) throws Exception {
         IntersectOperatorDescriptor.IntersectOperatorNodePushable pushable =
                 new 
IntersectOperatorDescriptor.IntersectOperatorNodePushable(ctx, nInputs, 
inputRecordDescriptor,
-                        compareFields, null, comparatorFactory);
+                        compareFields, null, null, comparatorFactory);
         assertEquals(nInputs, pushable.getInputArity());
 
         IFrameWriter[] writers = new IFrameWriter[nInputs];

Reply via email to