This is an automated email from the ASF dual-hosted git repository.

glauesppen pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 8840bb0473c4c2c4fe1f1d0aedd529d178995e2d
Author: AdityaGoel11 <[email protected]>
AuthorDate: Thu Oct 5 13:02:08 2023 +0530

    Creates the Logical operator for SQL Aggregations and processes basic 
aggregation queries for 0 or 1 grouping fields.
---
 .../calcite/converter/WayangAggregateVisitor.java  | 372 +++++++++++++++++++++
 .../sql/calcite/converter/WayangRelConverter.java  |   7 +-
 .../api/sql/calcite/rel/WayangAggregate.java       |  54 +++
 .../wayang/api/sql/calcite/rules/WayangRules.java  |  31 ++
 .../apache/wayang/api/sql/context/SqlContext.java  |   3 +-
 .../wayang-api-sql/src/main/resources/model.json   |   6 +-
 .../java/org/apache/wayang/api/sql/SqlAPI.java     |  23 +-
 .../java/org/apache/wayang/api/sql/SqlTest.java    |   8 +-
 .../apache/wayang/api/sql/SqlToWayangRelTest.java  |   3 +-
 .../wayang-api-sql/src/test/resources/model.json   |  26 +-
 10 files changed, 502 insertions(+), 31 deletions(-)

diff --git 
a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java
 
b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java
new file mode 100644
index 00000000..59369fdc
--- /dev/null
+++ 
b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.sql.calcite.converter;
+
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.wayang.api.sql.calcite.rel.WayangAggregate;
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.GlobalReduceOperator;
+import org.apache.wayang.basic.operators.MapOperator;
+import org.apache.wayang.basic.operators.ReduceByOperator;
+import org.apache.wayang.core.function.FunctionDescriptor;
+import org.apache.wayang.core.function.ReduceDescriptor;
+import org.apache.wayang.core.function.TransformationDescriptor;
+import org.apache.wayang.core.plan.wayangplan.Operator;
+import org.apache.wayang.core.types.DataUnitType;
+import org.apache.wayang.core.util.Tuple;
+
+import java.util.List;
+
+public class WayangAggregateVisitor extends 
WayangRelNodeVisitor<WayangAggregate> {
+
+    WayangAggregateVisitor(WayangRelConverter wayangRelConverter) {
+        super(wayangRelConverter);
+    }
+
+    @Override
+    Operator visit(WayangAggregate wayangRelNode) {
+        Operator childOp = 
wayangRelConverter.convert(wayangRelNode.getInput(0));
+
+        List<AggregateCall> aggregateCalls = ((Aggregate) 
wayangRelNode).getAggCallList();
+
+        for (AggregateCall aggregateCall : aggregateCalls) {
+            if (aggregateCall.getAggregation().getName().equals("SUM")) {
+                int fieldIndex = aggregateCall.getArgList().get(0);
+                //System.out.println(fieldIndex);
+                int groupCount = wayangRelNode.getGroupCount();
+                //System.out.println(groupCount);
+                if (groupCount > 0) {
+                    int groupIndex = 0; // wayangRelNode.getGroupSets()
+                    // Create the ReduceByOperator
+                    ReduceByOperator<Record, Object> reduceByOperator;
+                    reduceByOperator = new ReduceByOperator<>(
+                            new TransformationDescriptor<>(new 
KeyExtractor(groupIndex), Record.class, Object.class),
+                            new ReduceDescriptor<>(new SumFunction(fieldIndex),
+                                    DataUnitType.createGrouped(Record.class),
+                                    
DataUnitType.createBasicUnchecked(Record.class))
+                    );
+                    // Connect it to the child operator
+                    childOp.connectTo(0, reduceByOperator, 0);
+                    return reduceByOperator;
+                }
+                else {
+                    GlobalReduceOperator<Record> globalReduceOperator;
+                    globalReduceOperator = new GlobalReduceOperator<>(
+                            new ReduceDescriptor<>(new SumFunction(fieldIndex),
+                                    DataUnitType.createGrouped(Record.class),
+                                    
DataUnitType.createBasicUnchecked(Record.class))
+                    );
+                    childOp.connectTo(0,globalReduceOperator,0);
+                    return globalReduceOperator;
+                }
+            }
+            else if (aggregateCall.getAggregation().getName().equals("MIN")) {
+                int fieldIndex = aggregateCall.getArgList().get(0);
+                //System.out.println(fieldIndex);
+                int groupCount = wayangRelNode.getGroupCount();
+                //System.out.println(groupCount);
+                if (groupCount > 0) {
+                    int groupIndex = 0; // wayangRelNode.getGroupSets()
+                    // Create the ReduceByOperator
+                    ReduceByOperator<Record, Object> reduceByOperator;
+                    reduceByOperator = new ReduceByOperator<>(
+                            new TransformationDescriptor<>(new 
KeyExtractor(groupIndex), Record.class, Object.class),
+                            new ReduceDescriptor<>(new minFunction(fieldIndex),
+                                    DataUnitType.createGrouped(Record.class),
+                                    
DataUnitType.createBasicUnchecked(Record.class))
+                    );
+                    // Connect it to the child operator
+                    childOp.connectTo(0, reduceByOperator, 0);
+                    return reduceByOperator;
+                }
+                else {
+                    GlobalReduceOperator<Record> globalReduceOperator;
+                    globalReduceOperator = new GlobalReduceOperator<>(
+                            new ReduceDescriptor<>(new minFunction(fieldIndex),
+                                    DataUnitType.createGrouped(Record.class),
+                                    
DataUnitType.createBasicUnchecked(Record.class))
+                    );
+                    childOp.connectTo(0,globalReduceOperator,0);
+                    return globalReduceOperator;
+                }
+            }
+            else if (aggregateCall.getAggregation().getName().equals("MAX")) {
+                int fieldIndex = aggregateCall.getArgList().get(0);
+                //System.out.println(fieldIndex);
+                int groupCount = wayangRelNode.getGroupCount();
+                //System.out.println(groupCount);
+                if (groupCount > 0) {
+                    int groupIndex = 0; // wayangRelNode.getGroupSets()
+                    // Create the ReduceByOperator
+                    ReduceByOperator<Record, Object> reduceByOperator;
+                    reduceByOperator = new ReduceByOperator<>(
+                            new TransformationDescriptor<>(new 
KeyExtractor(groupIndex), Record.class, Object.class),
+                            new ReduceDescriptor<>(new maxFunction(fieldIndex),
+                                    DataUnitType.createGrouped(Record.class),
+                                    
DataUnitType.createBasicUnchecked(Record.class))
+                    );
+                    // Connect it to the child operator
+                    childOp.connectTo(0, reduceByOperator, 0);
+                    return reduceByOperator;
+                }
+                else {
+                    GlobalReduceOperator<Record> globalReduceOperator;
+                    globalReduceOperator = new GlobalReduceOperator<>(
+                            new ReduceDescriptor<>(new maxFunction(fieldIndex),
+                                    DataUnitType.createGrouped(Record.class),
+                                    
DataUnitType.createBasicUnchecked(Record.class))
+                    );
+                    childOp.connectTo(0,globalReduceOperator,0);
+                    return globalReduceOperator;
+                }
+            }
+            else if (aggregateCall.getAggregation().getName().equals("AVG")) {
+                //System.out.println(aggregateCall.getArgList());
+                int fieldIndex = aggregateCall.getArgList().get(0);
+                //System.out.println(fieldIndex);
+                MapOperator mapOperator1 = new MapOperator(
+                        new addCountCol(),
+                        Record.class,
+                        Record.class
+                );
+                childOp.connectTo(0, mapOperator1, 0);
+
+                int groupCount = wayangRelNode.getGroupCount();
+                //System.out.println(groupCount);
+                if (groupCount > 0) {
+                    int groupIndex = 0; // wayangRelNode.getGroupSets()
+                    // Create the ReduceByOperator
+                    ReduceByOperator<Record, Object> reduceByOperator;
+                    reduceByOperator = new ReduceByOperator<>(
+                            new TransformationDescriptor<>(new 
KeyExtractor(groupIndex), Record.class, Object.class),
+                            new ReduceDescriptor<>(new avgFunction(fieldIndex),
+                                    DataUnitType.createGrouped(Record.class),
+                                    
DataUnitType.createBasicUnchecked(Record.class))
+                    );
+                    // Connect it to the child operator
+                    mapOperator1.connectTo(0, reduceByOperator, 0);
+                    MapOperator mapOperator2 = new MapOperator(
+                            new getAvg(fieldIndex),
+                            Record.class,
+                            Record.class
+                    );
+                    reduceByOperator.connectTo(0,mapOperator2,0);
+                    return mapOperator2;
+                }
+                else {
+                    GlobalReduceOperator<Record> globalReduceOperator;
+                    globalReduceOperator = new GlobalReduceOperator<>(
+                            new ReduceDescriptor<>(new avgFunction(fieldIndex),
+                                    DataUnitType.createGrouped(Record.class),
+                                    
DataUnitType.createBasicUnchecked(Record.class))
+                    );
+                    mapOperator1.connectTo(0,globalReduceOperator,0);
+                    MapOperator mapOperator2 = new MapOperator(
+                            new getAvg(fieldIndex),
+                            Record.class,
+                            Record.class
+                    );
+                    globalReduceOperator.connectTo(0,mapOperator2,0);
+                    return mapOperator2;
+                }
+
+            } else {
+                throw new UnsupportedOperationException("Unsupported aggregate 
function: " +
+                        aggregateCall.getAggregation().getName());
+            }
+        }
+
+        return childOp;
+    }
+}
+class KeyExtractor implements FunctionDescriptor.SerializableFunction<Record, 
Object> {
+    private final int index;
+
+    public KeyExtractor(int index) {
+        this.index = index;
+    }
+
+    public Object apply(final Record record) {
+        return record.getField(index);
+    }
+}
+
+class KeyExtractorTuple implements 
FunctionDescriptor.SerializableFunction<Tuple<Record,Integer>, Object> {
+    private final int index;
+
+    public KeyExtractorTuple(int index) {
+        this.index = index;
+    }
+
+    public Object apply(final Tuple<Record,Integer> tuple) {
+        return tuple.field0.getField(index);
+    }
+}
+
+class avgFunction implements 
FunctionDescriptor.SerializableBinaryOperator<Record> {
+    private final int fieldIndex;
+    public avgFunction(int fieldIndex) {
+        this.fieldIndex = fieldIndex;
+    }
+    @Override
+    public Record apply(Record record, Record record2) {
+        double sum = 0;
+        sum = record.getDouble(fieldIndex) + record2.getDouble(fieldIndex);
+        int l = record.size();
+        int totalCount =  record.getInt(l-1) + record2.getInt(l-1);
+        Object[] resValues = new Object[l];
+        for(int i=0; i<l-1; i++){
+            if(i==fieldIndex){
+                resValues[i] = sum;
+            }
+            else{
+                resValues[i] = record.getField(i);
+            }
+        }
+        resValues[l-1] = totalCount;
+        return new Record(resValues);
+    }
+}
+class countFunction implements 
FunctionDescriptor.SerializableBinaryOperator<Record> {
+    public countFunction() {}
+    @Override
+    public Record apply(Record record, Record record2) {
+        int l = record.size();
+        int totalCount =  record.getInt(l-1) + record2.getInt(l-1);
+        Object[] resValues = new Object[l];
+        for(int i=0; i<l-1; i++){
+            resValues[i] = record.getField(i);
+        }
+        resValues[l-1] = totalCount;
+        return new Record(resValues);
+    }
+}
+class minFunction implements 
FunctionDescriptor.SerializableBinaryOperator<Record> {
+    private final int fieldIndex;
+    public minFunction(int fieldIndex) {
+        this.fieldIndex = fieldIndex;
+    }
+    @Override
+    public Record apply(Record record, Record record2) {
+        if (((Number) record.getField(fieldIndex)).doubleValue() < ((Number) 
record2.getField(fieldIndex)).doubleValue()) {
+            return record;
+        } else {
+            return record2;
+        }
+    }
+}
+
+class maxFunction implements 
FunctionDescriptor.SerializableBinaryOperator<Record> {
+    private final int fieldIndex;
+    public maxFunction(int fieldIndex) {
+        this.fieldIndex = fieldIndex;
+    }
+    @Override
+    public Record apply(Record record, Record record2) {
+        if (((Number) record.getField(fieldIndex)).doubleValue() > ((Number) 
record2.getField(fieldIndex)).doubleValue()) {
+            return record;
+        } else {
+            return record2;
+        }
+    }
+}
+
+class SumFunction implements 
FunctionDescriptor.SerializableBinaryOperator<Record> {
+    private final int fieldIndex;
+    public SumFunction(int fieldIndex) {
+        this.fieldIndex = fieldIndex;
+    }
+    @Override
+    public Record apply(Record record, Record record2) {
+        double sum = 0;
+        sum = record.getDouble(fieldIndex) + record2.getDouble(fieldIndex);
+        //create an array storing values of resrecord
+        int l = record.size();
+        Object[] resValues = new Object[l];
+        for(int i=0; i<l; i++){
+            if(i==fieldIndex){
+                resValues[i] = sum;
+            }
+            else{
+                resValues[i] = record.getField(i);
+            }
+        }
+        return new Record(resValues);
+    }
+}
+
+class addCountCol implements FunctionDescriptor.SerializableFunction<Record, 
Record> {
+    public addCountCol() {}
+    @Override
+    public Record apply(final Record record) {
+        int l = record.size();
+        int count = 1;
+        Object[] resValues = new Object[l+1];
+        for(int i=0; i<l; i++){
+            resValues[i] = record.getField(i);
+        }
+        resValues[l] = count;
+        return new Record(resValues);
+    }
+}
+
+class removeCountCol implements 
FunctionDescriptor.SerializableFunction<Record, Record> {
+    public removeCountCol() {}
+
+    @Override
+    public Record apply(final Record record) {
+        int l = record.size();
+        int count = record.getInt(l-1);
+        Object[] resValues = new Object[l-1];
+        resValues[0] = count;
+        for(int i=0; i<l-2; i++){
+            resValues[i+1] = record.getField(i);
+        }
+
+        return new Record(resValues);
+
+    }
+}
+
+class getAvg implements FunctionDescriptor.SerializableFunction<Record, 
Record> {
+    private final int fieldIndex;
+    public getAvg(int fieldindex) {
+        this.fieldIndex = fieldindex;
+    }
+
+    @Override
+    public Record apply(final Record record) {
+        int l = record.size();
+        int count = record.getInt(l-1);
+        double sum = record.getDouble(fieldIndex);
+        double avg = sum/count;
+        Object[] resValues = new Object[l-1];
+        for(int i=0; i<l-1; i++){
+            if(i==fieldIndex){
+                resValues[i] = avg;
+            }
+            else{
+                resValues[i] = record.getField(i);
+            }
+        }
+        return new Record(resValues);
+
+    }
+}
diff --git 
a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java
 
b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java
index 9401215a..d68c3fa5 100755
--- 
a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java
+++ 
b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java
@@ -20,10 +20,7 @@
 package org.apache.wayang.api.sql.calcite.converter;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.wayang.api.sql.calcite.rel.WayangFilter;
-import org.apache.wayang.api.sql.calcite.rel.WayangJoin;
-import org.apache.wayang.api.sql.calcite.rel.WayangProject;
-import org.apache.wayang.api.sql.calcite.rel.WayangTableScan;
+import org.apache.wayang.api.sql.calcite.rel.*;
 import org.apache.wayang.core.plan.wayangplan.Operator;
 
 public class WayangRelConverter {
@@ -37,6 +34,8 @@ public class WayangRelConverter {
             return new WayangFilterVisitor(this).visit((WayangFilter) node);
         } else if (node instanceof WayangJoin) {
             return new WayangJoinVisitor(this).visit((WayangJoin) node);
+        } else if (node instanceof WayangAggregate) {
+            return new WayangAggregateVisitor(this).visit((WayangAggregate) 
node);
         }
         throw new IllegalStateException("Operator translation not supported 
yet");
     }
diff --git 
a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rel/WayangAggregate.java
 
b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rel/WayangAggregate.java
new file mode 100644
index 00000000..2ddcadff
--- /dev/null
+++ 
b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rel/WayangAggregate.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.api.sql.calcite.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.wayang.api.sql.calcite.convention.WayangConvention;
+
+import java.util.List;
+
+public class WayangAggregate extends Aggregate implements WayangRel {
+
+    public WayangAggregate(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            List<RelHint> hints,
+            RelNode input,
+            ImmutableBitSet groupSet,
+            List<ImmutableBitSet> groupSets,
+            List<AggregateCall> aggCalls) {
+        super(cluster, traitSet, hints, input, groupSet, groupSets, aggCalls);
+        assert getConvention() instanceof WayangConvention;
+    }
+
+    @Override
+    public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet 
groupSet,
+                          List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggCalls) {
+        return new WayangAggregate(getCluster(), traitSet, hints, input, 
groupSet, groupSets, aggCalls);
+    }
+
+    @Override
+    public String toString() {
+        return "Wayang Aggregate";
+    }
+}
\ No newline at end of file
diff --git 
a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java
 
b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java
index 7b0bf5fd..0aa221f4 100755
--- 
a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java
+++ 
b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.wayang.api.sql.calcite.convention.WayangConvention;
@@ -33,6 +34,7 @@ import org.apache.wayang.api.sql.calcite.rel.WayangFilter;
 import org.apache.wayang.api.sql.calcite.rel.WayangJoin;
 import org.apache.wayang.api.sql.calcite.rel.WayangProject;
 import org.apache.wayang.api.sql.calcite.rel.WayangTableScan;
+import org.apache.wayang.api.sql.calcite.rel.WayangAggregate;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.ArrayList;
@@ -51,6 +53,7 @@ public class WayangRules {
     public static final RelOptRule WAYANG_TABLESCAN_RULE = new 
WayangTableScanRule(WayangTableScanRule.DEFAULT_CONFIG);
     public static final RelOptRule WAYANG_TABLESCAN_ENUMERABLE_RULE =
             new WayangTableScanRule(WayangTableScanRule.ENUMERABLE_CONFIG);
+    public static final RelOptRule WAYANG_AGGREGATE_RULE =  new 
WayangAggregateRule(WayangAggregateRule.DEFAULT_CONFIG);
 
 
     private static class WayangProjectRule extends ConverterRule {
@@ -174,6 +177,34 @@ public class WayangRules {
             );
         }
     }
+    private static class WayangAggregateRule extends ConverterRule {
+
+        public static final Config DEFAULT_CONFIG = Config.INSTANCE
+                .withConversion(LogicalAggregate.class,
+                        Convention.NONE, WayangConvention.INSTANCE,
+                        "WayangAggregateRule")
+                .withRuleFactory(WayangAggregateRule::new);
+
+        protected WayangAggregateRule(Config config) {
+            super(config);
+        }
+
+        @Override
+        public @Nullable RelNode convert(RelNode relNode) {
+            LogicalAggregate aggregate = (LogicalAggregate) relNode;
+            RelNode input = convert(aggregate.getInput(), 
aggregate.getInput().getTraitSet().replace(WayangConvention.INSTANCE));
+
+            return new WayangAggregate(
+                    aggregate.getCluster(),
+                    aggregate.getTraitSet().replace(WayangConvention.INSTANCE),
+                    aggregate.getHints(),
+                    input,
+                    aggregate.getGroupSet(),
+                    aggregate.getGroupSets(),
+                    aggregate.getAggCallList()
+            );
+        }
+    }
 
 
 }
diff --git 
a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java
 
b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java
index dbd170c5..5899ee0b 100755
--- 
a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java
+++ 
b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java
@@ -87,7 +87,8 @@ public class SqlContext {
                 WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE,
                 WayangRules.WAYANG_PROJECT_RULE,
                 WayangRules.WAYANG_FILTER_RULE,
-                WayangRules.WAYANG_JOIN_RULE
+                WayangRules.WAYANG_JOIN_RULE,
+                WayangRules.WAYANG_AGGREGATE_RULE
         );
         RelNode wayangRel = optimizer.optimize(
                 relNode,
diff --git a/wayang-api/wayang-api-sql/src/main/resources/model.json 
b/wayang-api/wayang-api-sql/src/main/resources/model.json
index c6df0033..4f1d5bb1 100644
--- a/wayang-api/wayang-api-sql/src/main/resources/model.json
+++ b/wayang-api/wayang-api-sql/src/main/resources/model.json
@@ -9,9 +9,9 @@
         "factory": "org.apache.wayang.api.sql.calcite.jdbc.JdbcSchema$Factory",
         "operand": {
           "jdbcDriver": "org.postgresql.Driver",
-          "jdbcUrl": "jdbc:postgresql://localhost:5432/imdb",
-          "jdbcUser": "postgres",
-          "jdbcPassword": "postgres"
+          "jdbcUrl": "jdbc:postgresql://localhost:5432/dvdrental",
+          "jdbcUser": "aditya",
+          "jdbcPassword": "12345678"
         }
       },
       {
diff --git 
a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlAPI.java 
b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlAPI.java
index 15283aaf..87febd3d 100755
--- 
a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlAPI.java
+++ 
b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlAPI.java
@@ -133,6 +133,26 @@ public class SqlAPI {
         printResults(10, result);
     }
 
+    public static void exampleAggregateWithPostgres() throws Exception {
+        Configuration configuration = new Configuration();
+        configuration.setProperty("wayang.postgres.jdbc.url", 
"jdbc:postgresql://localhost:5432/dvdrental");
+        configuration.setProperty("wayang.postgres.jdbc.user", "aditya");
+        configuration.setProperty("wayang.postgres.jdbc.password", "12345678");
+
+        String calciteModel = Resources.toString(
+                SqlAPI.class.getResource("/model.json"),
+                Charset.defaultCharset());
+        configuration.setProperty("wayang.calcite.model", calciteModel);
+
+        SqlContext sqlContext = new SqlContext(configuration);
+
+
+        Collection<Record> result = sqlContext.executeSql(
+                "SELECT avg(amount) FROM postgres.payment"
+        );
+
+        printResults(10, result);
+    }
 
     public static void main(String... args) throws Exception {
 //        BasicConfigurator.configure();
@@ -140,9 +160,10 @@ public class SqlAPI {
 //        new SqlAPI().exampleFs();
 //        new SqlAPI().exampleWithPostgres();
 //        new SqlAPI().exampleJoinWithPostgres();
+//        new SqlAPI().exampleCrossPlatform();
         long startTime = System.nanoTime();
 
-        new SqlAPI().exampleCrossPlatform();
+        new SqlAPI().exampleAggregateWithPostgres();
 
         long endTime = System.nanoTime();
         long duration = (endTime - startTime);
diff --git 
a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlTest.java
 
b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlTest.java
index 621da6c9..c1d25869 100755
--- 
a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlTest.java
+++ 
b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlTest.java
@@ -39,9 +39,9 @@ public class SqlTest {
     public static void main(String[] args) {
         WayangPlan wayangPlan;
         Configuration configuration = new Configuration();
-        configuration.setProperty("wayang.postgres.jdbc.url", 
"jdbc:postgresql://localhost:5432/imdb");
-        configuration.setProperty("wayang.postgres.jdbc.user", "postgres");
-        configuration.setProperty("wayang.postgres.jdbc.password", "password");
+        configuration.setProperty("wayang.postgres.jdbc.url", 
"jdbc:postgresql://localhost:5432/dvdrental");
+        configuration.setProperty("wayang.postgres.jdbc.user", "aditya");
+        configuration.setProperty("wayang.postgres.jdbc.password", "12345678");
 
         WayangContext wayangContext = new WayangContext(configuration)
                 .withPlugin(Java.basicPlugin())
@@ -50,7 +50,7 @@ public class SqlTest {
 
         Collection<Record> collector = new ArrayList<>();
 
-        TableSource customer = new PostgresTableSource("person");
+        TableSource customer = new PostgresTableSource("sales");
         MapOperator<Record, Record> projection = MapOperator.createProjection(
                 Record.class,
                 Record.class,
diff --git 
a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java
 
b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java
index 91b27d14..471c11d5 100755
--- 
a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java
+++ 
b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java
@@ -84,7 +84,8 @@ public class SqlToWayangRelTest {
                 WayangRules.WAYANG_PROJECT_RULE,
                 WayangRules.WAYANG_FILTER_RULE,
                 WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE,
-                WayangRules.WAYANG_JOIN_RULE
+                WayangRules.WAYANG_JOIN_RULE,
+                WayangRules.WAYANG_AGGREGATE_RULE
         );
 
         RelNode wayangRel = optimizer.optimize(
diff --git a/wayang-api/wayang-api-sql/src/test/resources/model.json 
b/wayang-api/wayang-api-sql/src/test/resources/model.json
index 9465c4a1..0a5ea233 100644
--- a/wayang-api/wayang-api-sql/src/test/resources/model.json
+++ b/wayang-api/wayang-api-sql/src/test/resources/model.json
@@ -1,24 +1,16 @@
 {
-  "version": '1.0',
-  "defaultSchema": 'wayang',
+  "version": "1.0",
+  "defaultSchema": "wayang",
   "schemas": [
     {
-      "name": 'postgres',
-      "type": 'custom',
-      "factory": 'org.apache.wayang.api.sql.calcite.jdbc.JdbcSchema$Factory',
+      "name": "postgres",
+      "type": "custom",
+      "factory": "org.apache.wayang.api.sql.calcite.jdbc.JdbcSchema$Factory",
       "operand": {
-        "jdbcDriver": 'org.postgresql.Driver',
-        "jdbcUrl": 'jdbc:postgresql://localhost:5432/imdb',
-        "jdbcUser": 'postgres',
-        "jdbcPassword": 'postgres'
-      }
-    },
-    {
-      "name": 'fs',
-      "type": 'custom',
-      "factory": 'org.apache.calcite.adapter.file.FileSchemaFactory',
-      "operand": {
-        "directory": 
'C:/incubator-Wayang-CrossPlatform/incubator-wayang-SQL/wayang-api/wayang-api-sql/src/test/resources'
+      "jdbcDriver": "org.postgresql.Driver",
+      "jdbcUrl": "jdbc:postgresql://localhost:5432/dvdrental",
+      "jdbcUser": "aditya",
+      "jdbcPassword": "12345678"
       }
     }
   ]

Reply via email to