zkaoudi commented on code in PR #699:
URL: https://github.com/apache/wayang/pull/699#discussion_r2936382013


##########
wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java:
##########
@@ -170,21 +172,22 @@ protected static Tuple2<String, SqlQueryChannel.Instance> 
createSqlQuery(final E
         // Extract the different types of ExecutionOperators from the stage.
         final JdbcTableSource tableOp = (JdbcTableSource) 
startTask.getOperator();
         SqlQueryChannel.Instance tipChannelInstance = 
JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor);
-        final Collection<JdbcFilterOperator> filterTasks = new ArrayList<>(4);
+        final Collection<JdbcExecutionOperator> filterTasks = new 
ArrayList<>(4);
         JdbcProjectionOperator projectionTask = null;
-        final Collection<JdbcJoinOperator<?>> joinTasks = new ArrayList<>();
+        final Collection<JdbcExecutionOperator> joinTasks = new ArrayList<>();
         final Set<ExecutionTask> allTasks = stage.getAllTasks();
         assert allTasks.size() <= 3;
         ExecutionTask nextTask = 
JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage);
         while (nextTask != null) {
             // Evaluate the nextTask.
-            if (nextTask.getOperator() instanceof final JdbcFilterOperator 
filterOperator) {
-                filterTasks.add(filterOperator);
-            } else if (nextTask.getOperator() instanceof 
JdbcProjectionOperator projectionOperator) {
+            final var operator = nextTask.getOperator();
+            if (operator instanceof JdbcFilterOperator || operator instanceof 
SpatialFilterOperator) {

Review Comment:
   And maybe then we do not need the extra condition in the if?



##########
wayang-plugins/wayang-spatial/src/test/java/org/apache/wayang/spatial/integration/PostgresSpatialIntegrationTest.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.spatial.integration;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.spatial.data.WayangGeometry;
+import org.apache.wayang.basic.operators.*;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.spatial.SpatialPredicate;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.postgres.Postgres;
+import org.apache.wayang.postgres.operators.PostgresTableSource;
+import org.apache.wayang.spark.Spark;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.locationtech.jts.geom.Envelope;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+@Disabled("Requires local Postgres test database.")

Review Comment:
   can you check if the hsqldb library we use for other jdbc tests supports 
spatial operators? Then we can have the tests without requiring a postgres 
instance setup and running.



##########
wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SpatialJoinOperator.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.operators;
+
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.core.api.spatial.SpatialGeometry;
+import org.apache.wayang.core.api.spatial.SpatialPredicate;
+import org.apache.wayang.core.function.FunctionDescriptor;
+import org.apache.wayang.core.function.TransformationDescriptor;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
+import org.apache.wayang.core.plan.wayangplan.BinaryToUnaryOperator;
+import org.apache.wayang.core.types.DataSetType;
+
+public class SpatialJoinOperator<InputType0, InputType1> extends 
BinaryToUnaryOperator<InputType0, InputType1, Tuple2<InputType0, InputType1>> {

Review Comment:
   Add a comment about the operator similar to what you had done to 
spatialfilter operator



##########
wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java:
##########
@@ -170,21 +172,22 @@ protected static Tuple2<String, SqlQueryChannel.Instance> 
createSqlQuery(final E
         // Extract the different types of ExecutionOperators from the stage.
         final JdbcTableSource tableOp = (JdbcTableSource) 
startTask.getOperator();
         SqlQueryChannel.Instance tipChannelInstance = 
JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor);
-        final Collection<JdbcFilterOperator> filterTasks = new ArrayList<>(4);
+        final Collection<JdbcExecutionOperator> filterTasks = new 
ArrayList<>(4);
         JdbcProjectionOperator projectionTask = null;
-        final Collection<JdbcJoinOperator<?>> joinTasks = new ArrayList<>();
+        final Collection<JdbcExecutionOperator> joinTasks = new ArrayList<>();
         final Set<ExecutionTask> allTasks = stage.getAllTasks();
         assert allTasks.size() <= 3;
         ExecutionTask nextTask = 
JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage);
         while (nextTask != null) {
             // Evaluate the nextTask.
-            if (nextTask.getOperator() instanceof final JdbcFilterOperator 
filterOperator) {
-                filterTasks.add(filterOperator);
-            } else if (nextTask.getOperator() instanceof 
JdbcProjectionOperator projectionOperator) {
+            final var operator = nextTask.getOperator();
+            if (operator instanceof JdbcFilterOperator || operator instanceof 
SpatialFilterOperator) {

Review Comment:
   Shouldn't this be a jdbcspatialfilter operator? Why referencing the logical 
operator?



##########
wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java:
##########
@@ -170,21 +172,22 @@ protected static Tuple2<String, SqlQueryChannel.Instance> 
createSqlQuery(final E
         // Extract the different types of ExecutionOperators from the stage.
         final JdbcTableSource tableOp = (JdbcTableSource) 
startTask.getOperator();
         SqlQueryChannel.Instance tipChannelInstance = 
JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor);
-        final Collection<JdbcFilterOperator> filterTasks = new ArrayList<>(4);
+        final Collection<JdbcExecutionOperator> filterTasks = new 
ArrayList<>(4);
         JdbcProjectionOperator projectionTask = null;
-        final Collection<JdbcJoinOperator<?>> joinTasks = new ArrayList<>();
+        final Collection<JdbcExecutionOperator> joinTasks = new ArrayList<>();
         final Set<ExecutionTask> allTasks = stage.getAllTasks();
         assert allTasks.size() <= 3;
         ExecutionTask nextTask = 
JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage);
         while (nextTask != null) {
             // Evaluate the nextTask.
-            if (nextTask.getOperator() instanceof final JdbcFilterOperator 
filterOperator) {
-                filterTasks.add(filterOperator);
-            } else if (nextTask.getOperator() instanceof 
JdbcProjectionOperator projectionOperator) {
+            final var operator = nextTask.getOperator();
+            if (operator instanceof JdbcFilterOperator || operator instanceof 
SpatialFilterOperator) {
+                filterTasks.add((JdbcExecutionOperator) operator);
+            } else if (operator instanceof JdbcProjectionOperator) {
                 assert projectionTask == null; // Allow one projection 
operator per stage for now.
-                projectionTask = projectionOperator;
-            } else if (nextTask.getOperator() instanceof JdbcJoinOperator 
joinOperator) {
-                joinTasks.add(joinOperator);
+                projectionTask = (JdbcProjectionOperator) operator;
+            } else if (operator instanceof JdbcJoinOperator || (operator 
instanceof SpatialJoinOperator)) {

Review Comment:
   same here? why a special case for the spatial wayang join operator is needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to