This is an automated email from the ASF dual-hosted git repository. glauesppen pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 8840bb0473c4c2c4fe1f1d0aedd529d178995e2d Author: AdityaGoel11 <[email protected]> AuthorDate: Thu Oct 5 13:02:08 2023 +0530 Creates the Logical operator for SQL Aggregations and processes basic aggregation queries for 0 or 1 grouping fields. --- .../calcite/converter/WayangAggregateVisitor.java | 372 +++++++++++++++++++++ .../sql/calcite/converter/WayangRelConverter.java | 7 +- .../api/sql/calcite/rel/WayangAggregate.java | 54 +++ .../wayang/api/sql/calcite/rules/WayangRules.java | 31 ++ .../apache/wayang/api/sql/context/SqlContext.java | 3 +- .../wayang-api-sql/src/main/resources/model.json | 6 +- .../java/org/apache/wayang/api/sql/SqlAPI.java | 23 +- .../java/org/apache/wayang/api/sql/SqlTest.java | 8 +- .../apache/wayang/api/sql/SqlToWayangRelTest.java | 3 +- .../wayang-api-sql/src/test/resources/model.json | 26 +- 10 files changed, 502 insertions(+), 31 deletions(-) diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java new file mode 100644 index 00000000..59369fdc --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.sql.calcite.converter; + +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.wayang.api.sql.calcite.rel.WayangAggregate; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.GlobalReduceOperator; +import org.apache.wayang.basic.operators.MapOperator; +import org.apache.wayang.basic.operators.ReduceByOperator; +import org.apache.wayang.core.function.FunctionDescriptor; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.plan.wayangplan.Operator; +import org.apache.wayang.core.types.DataUnitType; +import org.apache.wayang.core.util.Tuple; + +import java.util.List; + +public class WayangAggregateVisitor extends WayangRelNodeVisitor<WayangAggregate> { + + WayangAggregateVisitor(WayangRelConverter wayangRelConverter) { + super(wayangRelConverter); + } + + @Override + Operator visit(WayangAggregate wayangRelNode) { + Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput(0)); + + List<AggregateCall> aggregateCalls = ((Aggregate) wayangRelNode).getAggCallList(); + + for (AggregateCall aggregateCall : aggregateCalls) { + if (aggregateCall.getAggregation().getName().equals("SUM")) { + int fieldIndex = aggregateCall.getArgList().get(0); + //System.out.println(fieldIndex); + int groupCount = wayangRelNode.getGroupCount(); + //System.out.println(groupCount); + if (groupCount > 0) { + int groupIndex = 0; // wayangRelNode.getGroupSets() + // Create the ReduceByOperator + ReduceByOperator<Record, Object> reduceByOperator; + reduceByOperator = new ReduceByOperator<>( + new TransformationDescriptor<>(new KeyExtractor(groupIndex), Record.class, Object.class), + new ReduceDescriptor<>(new SumFunction(fieldIndex), + DataUnitType.createGrouped(Record.class), + DataUnitType.createBasicUnchecked(Record.class)) + ); + // Connect it to the child operator + childOp.connectTo(0, reduceByOperator, 0); + return reduceByOperator; + } + else { + GlobalReduceOperator<Record> globalReduceOperator; + globalReduceOperator = new GlobalReduceOperator<>( + new ReduceDescriptor<>(new SumFunction(fieldIndex), + DataUnitType.createGrouped(Record.class), + DataUnitType.createBasicUnchecked(Record.class)) + ); + childOp.connectTo(0,globalReduceOperator,0); + return globalReduceOperator; + } + } + else if (aggregateCall.getAggregation().getName().equals("MIN")) { + int fieldIndex = aggregateCall.getArgList().get(0); + //System.out.println(fieldIndex); + int groupCount = wayangRelNode.getGroupCount(); + //System.out.println(groupCount); + if (groupCount > 0) { + int groupIndex = 0; // wayangRelNode.getGroupSets() + // Create the ReduceByOperator + ReduceByOperator<Record, Object> reduceByOperator; + reduceByOperator = new ReduceByOperator<>( + new TransformationDescriptor<>(new KeyExtractor(groupIndex), Record.class, Object.class), + new ReduceDescriptor<>(new minFunction(fieldIndex), + DataUnitType.createGrouped(Record.class), + DataUnitType.createBasicUnchecked(Record.class)) + ); + // Connect it to the child operator + childOp.connectTo(0, reduceByOperator, 0); + return reduceByOperator; + } + else { + GlobalReduceOperator<Record> globalReduceOperator; + globalReduceOperator = new GlobalReduceOperator<>( + new ReduceDescriptor<>(new minFunction(fieldIndex), + DataUnitType.createGrouped(Record.class), + DataUnitType.createBasicUnchecked(Record.class)) + ); + childOp.connectTo(0,globalReduceOperator,0); + return globalReduceOperator; + } + } + else if (aggregateCall.getAggregation().getName().equals("MAX")) { + int fieldIndex = aggregateCall.getArgList().get(0); + //System.out.println(fieldIndex); + int groupCount = wayangRelNode.getGroupCount(); + //System.out.println(groupCount); + if (groupCount > 0) { + int groupIndex = 0; // wayangRelNode.getGroupSets() + // Create the ReduceByOperator + ReduceByOperator<Record, Object> reduceByOperator; + reduceByOperator = new ReduceByOperator<>( + new TransformationDescriptor<>(new KeyExtractor(groupIndex), Record.class, Object.class), + new ReduceDescriptor<>(new maxFunction(fieldIndex), + DataUnitType.createGrouped(Record.class), + DataUnitType.createBasicUnchecked(Record.class)) + ); + // Connect it to the child operator + childOp.connectTo(0, reduceByOperator, 0); + return reduceByOperator; + } + else { + GlobalReduceOperator<Record> globalReduceOperator; + globalReduceOperator = new GlobalReduceOperator<>( + new ReduceDescriptor<>(new maxFunction(fieldIndex), + DataUnitType.createGrouped(Record.class), + DataUnitType.createBasicUnchecked(Record.class)) + ); + childOp.connectTo(0,globalReduceOperator,0); + return globalReduceOperator; + } + } + else if (aggregateCall.getAggregation().getName().equals("AVG")) { + //System.out.println(aggregateCall.getArgList()); + int fieldIndex = aggregateCall.getArgList().get(0); + //System.out.println(fieldIndex); + MapOperator mapOperator1 = new MapOperator( + new addCountCol(), + Record.class, + Record.class + ); + childOp.connectTo(0, mapOperator1, 0); + + int groupCount = wayangRelNode.getGroupCount(); + //System.out.println(groupCount); + if (groupCount > 0) { + int groupIndex = 0; // wayangRelNode.getGroupSets() + // Create the ReduceByOperator + ReduceByOperator<Record, Object> reduceByOperator; + reduceByOperator = new ReduceByOperator<>( + new TransformationDescriptor<>(new KeyExtractor(groupIndex), Record.class, Object.class), + new ReduceDescriptor<>(new avgFunction(fieldIndex), + DataUnitType.createGrouped(Record.class), + DataUnitType.createBasicUnchecked(Record.class)) + ); + // Connect it to the child operator + mapOperator1.connectTo(0, reduceByOperator, 0); + MapOperator mapOperator2 = new MapOperator( + new getAvg(fieldIndex), + Record.class, + Record.class + ); + reduceByOperator.connectTo(0,mapOperator2,0); + return mapOperator2; + } + else { + GlobalReduceOperator<Record> globalReduceOperator; + globalReduceOperator = new GlobalReduceOperator<>( + new ReduceDescriptor<>(new avgFunction(fieldIndex), + DataUnitType.createGrouped(Record.class), + DataUnitType.createBasicUnchecked(Record.class)) + ); + mapOperator1.connectTo(0,globalReduceOperator,0); + MapOperator mapOperator2 = new MapOperator( + new getAvg(fieldIndex), + Record.class, + Record.class + ); + globalReduceOperator.connectTo(0,mapOperator2,0); + return mapOperator2; + } + + } else { + throw new UnsupportedOperationException("Unsupported aggregate function: " + + aggregateCall.getAggregation().getName()); + } + } + + return childOp; + } +} +class KeyExtractor implements FunctionDescriptor.SerializableFunction<Record, Object> { + private final int index; + + public KeyExtractor(int index) { + this.index = index; + } + + public Object apply(final Record record) { + return record.getField(index); + } +} + +class KeyExtractorTuple implements FunctionDescriptor.SerializableFunction<Tuple<Record,Integer>, Object> { + private final int index; + + public KeyExtractorTuple(int index) { + this.index = index; + } + + public Object apply(final Tuple<Record,Integer> tuple) { + return tuple.field0.getField(index); + } +} + +class avgFunction implements FunctionDescriptor.SerializableBinaryOperator<Record> { + private final int fieldIndex; + public avgFunction(int fieldIndex) { + this.fieldIndex = fieldIndex; + } + @Override + public Record apply(Record record, Record record2) { + double sum = 0; + sum = record.getDouble(fieldIndex) + record2.getDouble(fieldIndex); + int l = record.size(); + int totalCount = record.getInt(l-1) + record2.getInt(l-1); + Object[] resValues = new Object[l]; + for(int i=0; i<l-1; i++){ + if(i==fieldIndex){ + resValues[i] = sum; + } + else{ + resValues[i] = record.getField(i); + } + } + resValues[l-1] = totalCount; + return new Record(resValues); + } +} +class countFunction implements FunctionDescriptor.SerializableBinaryOperator<Record> { + public countFunction() {} + @Override + public Record apply(Record record, Record record2) { + int l = record.size(); + int totalCount = record.getInt(l-1) + record2.getInt(l-1); + Object[] resValues = new Object[l]; + for(int i=0; i<l-1; i++){ + resValues[i] = record.getField(i); + } + resValues[l-1] = totalCount; + return new Record(resValues); + } +} +class minFunction implements FunctionDescriptor.SerializableBinaryOperator<Record> { + private final int fieldIndex; + public minFunction(int fieldIndex) { + this.fieldIndex = fieldIndex; + } + @Override + public Record apply(Record record, Record record2) { + if (((Number) record.getField(fieldIndex)).doubleValue() < ((Number) record2.getField(fieldIndex)).doubleValue()) { + return record; + } else { + return record2; + } + } +} + +class maxFunction implements FunctionDescriptor.SerializableBinaryOperator<Record> { + private final int fieldIndex; + public maxFunction(int fieldIndex) { + this.fieldIndex = fieldIndex; + } + @Override + public Record apply(Record record, Record record2) { + if (((Number) record.getField(fieldIndex)).doubleValue() > ((Number) record2.getField(fieldIndex)).doubleValue()) { + return record; + } else { + return record2; + } + } +} + +class SumFunction implements FunctionDescriptor.SerializableBinaryOperator<Record> { + private final int fieldIndex; + public SumFunction(int fieldIndex) { + this.fieldIndex = fieldIndex; + } + @Override + public Record apply(Record record, Record record2) { + double sum = 0; + sum = record.getDouble(fieldIndex) + record2.getDouble(fieldIndex); + //create an array storing values of resrecord + int l = record.size(); + Object[] resValues = new Object[l]; + for(int i=0; i<l; i++){ + if(i==fieldIndex){ + resValues[i] = sum; + } + else{ + resValues[i] = record.getField(i); + } + } + return new Record(resValues); + } +} + +class addCountCol implements FunctionDescriptor.SerializableFunction<Record, Record> { + public addCountCol() {} + @Override + public Record apply(final Record record) { + int l = record.size(); + int count = 1; + Object[] resValues = new Object[l+1]; + for(int i=0; i<l; i++){ + resValues[i] = record.getField(i); + } + resValues[l] = count; + return new Record(resValues); + } +} + +class removeCountCol implements FunctionDescriptor.SerializableFunction<Record, Record> { + public removeCountCol() {} + + @Override + public Record apply(final Record record) { + int l = record.size(); + int count = record.getInt(l-1); + Object[] resValues = new Object[l-1]; + resValues[0] = count; + for(int i=0; i<l-2; i++){ + resValues[i+1] = record.getField(i); + } + + return new Record(resValues); + + } +} + +class getAvg implements FunctionDescriptor.SerializableFunction<Record, Record> { + private final int fieldIndex; + public getAvg(int fieldindex) { + this.fieldIndex = fieldindex; + } + + @Override + public Record apply(final Record record) { + int l = record.size(); + int count = record.getInt(l-1); + double sum = record.getDouble(fieldIndex); + double avg = sum/count; + Object[] resValues = new Object[l-1]; + for(int i=0; i<l-1; i++){ + if(i==fieldIndex){ + resValues[i] = avg; + } + else{ + resValues[i] = record.getField(i); + } + } + return new Record(resValues); + + } +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java index 9401215a..d68c3fa5 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java @@ -20,10 +20,7 @@ package org.apache.wayang.api.sql.calcite.converter; import org.apache.calcite.rel.RelNode; -import org.apache.wayang.api.sql.calcite.rel.WayangFilter; -import org.apache.wayang.api.sql.calcite.rel.WayangJoin; -import org.apache.wayang.api.sql.calcite.rel.WayangProject; -import org.apache.wayang.api.sql.calcite.rel.WayangTableScan; +import org.apache.wayang.api.sql.calcite.rel.*; import org.apache.wayang.core.plan.wayangplan.Operator; public class WayangRelConverter { @@ -37,6 +34,8 @@ public class WayangRelConverter { return new WayangFilterVisitor(this).visit((WayangFilter) node); } else if (node instanceof WayangJoin) { return new WayangJoinVisitor(this).visit((WayangJoin) node); + } else if (node instanceof WayangAggregate) { + return new WayangAggregateVisitor(this).visit((WayangAggregate) node); } throw new IllegalStateException("Operator translation not supported yet"); } diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rel/WayangAggregate.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rel/WayangAggregate.java new file mode 100644 index 00000000..2ddcadff --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rel/WayangAggregate.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.api.sql.calcite.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.wayang.api.sql.calcite.convention.WayangConvention; + +import java.util.List; + +public class WayangAggregate extends Aggregate implements WayangRel { + + public WayangAggregate( + RelOptCluster cluster, + RelTraitSet traitSet, + List<RelHint> hints, + RelNode input, + ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, + List<AggregateCall> aggCalls) { + super(cluster, traitSet, hints, input, groupSet, groupSets, aggCalls); + assert getConvention() instanceof WayangConvention; + } + + @Override + public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { + return new WayangAggregate(getCluster(), traitSet, hints, input, groupSet, groupSets, aggCalls); + } + + @Override + public String toString() { + return "Wayang Aggregate"; + } +} \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java index 7b0bf5fd..0aa221f4 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java @@ -26,6 +26,7 @@ import org.apache.calcite.rel.convert.ConverterRule; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.wayang.api.sql.calcite.convention.WayangConvention; @@ -33,6 +34,7 @@ import org.apache.wayang.api.sql.calcite.rel.WayangFilter; import org.apache.wayang.api.sql.calcite.rel.WayangJoin; import org.apache.wayang.api.sql.calcite.rel.WayangProject; import org.apache.wayang.api.sql.calcite.rel.WayangTableScan; +import org.apache.wayang.api.sql.calcite.rel.WayangAggregate; import org.checkerframework.checker.nullness.qual.Nullable; import java.util.ArrayList; @@ -51,6 +53,7 @@ public class WayangRules { public static final RelOptRule WAYANG_TABLESCAN_RULE = new WayangTableScanRule(WayangTableScanRule.DEFAULT_CONFIG); public static final RelOptRule WAYANG_TABLESCAN_ENUMERABLE_RULE = new WayangTableScanRule(WayangTableScanRule.ENUMERABLE_CONFIG); + public static final RelOptRule WAYANG_AGGREGATE_RULE = new WayangAggregateRule(WayangAggregateRule.DEFAULT_CONFIG); private static class WayangProjectRule extends ConverterRule { @@ -174,6 +177,34 @@ public class WayangRules { ); } } + private static class WayangAggregateRule extends ConverterRule { + + public static final Config DEFAULT_CONFIG = Config.INSTANCE + .withConversion(LogicalAggregate.class, + Convention.NONE, WayangConvention.INSTANCE, + "WayangAggregateRule") + .withRuleFactory(WayangAggregateRule::new); + + protected WayangAggregateRule(Config config) { + super(config); + } + + @Override + public @Nullable RelNode convert(RelNode relNode) { + LogicalAggregate aggregate = (LogicalAggregate) relNode; + RelNode input = convert(aggregate.getInput(), aggregate.getInput().getTraitSet().replace(WayangConvention.INSTANCE)); + + return new WayangAggregate( + aggregate.getCluster(), + aggregate.getTraitSet().replace(WayangConvention.INSTANCE), + aggregate.getHints(), + input, + aggregate.getGroupSet(), + aggregate.getGroupSets(), + aggregate.getAggCallList() + ); + } + } } diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java index dbd170c5..5899ee0b 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java @@ -87,7 +87,8 @@ public class SqlContext { WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, WayangRules.WAYANG_PROJECT_RULE, WayangRules.WAYANG_FILTER_RULE, - WayangRules.WAYANG_JOIN_RULE + WayangRules.WAYANG_JOIN_RULE, + WayangRules.WAYANG_AGGREGATE_RULE ); RelNode wayangRel = optimizer.optimize( relNode, diff --git a/wayang-api/wayang-api-sql/src/main/resources/model.json b/wayang-api/wayang-api-sql/src/main/resources/model.json index c6df0033..4f1d5bb1 100644 --- a/wayang-api/wayang-api-sql/src/main/resources/model.json +++ b/wayang-api/wayang-api-sql/src/main/resources/model.json @@ -9,9 +9,9 @@ "factory": "org.apache.wayang.api.sql.calcite.jdbc.JdbcSchema$Factory", "operand": { "jdbcDriver": "org.postgresql.Driver", - "jdbcUrl": "jdbc:postgresql://localhost:5432/imdb", - "jdbcUser": "postgres", - "jdbcPassword": "postgres" + "jdbcUrl": "jdbc:postgresql://localhost:5432/dvdrental", + "jdbcUser": "aditya", + "jdbcPassword": "12345678" } }, { diff --git a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlAPI.java b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlAPI.java index 15283aaf..87febd3d 100755 --- a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlAPI.java +++ b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlAPI.java @@ -133,6 +133,26 @@ public class SqlAPI { printResults(10, result); } + public static void exampleAggregateWithPostgres() throws Exception { + Configuration configuration = new Configuration(); + configuration.setProperty("wayang.postgres.jdbc.url", "jdbc:postgresql://localhost:5432/dvdrental"); + configuration.setProperty("wayang.postgres.jdbc.user", "aditya"); + configuration.setProperty("wayang.postgres.jdbc.password", "12345678"); + + String calciteModel = Resources.toString( + SqlAPI.class.getResource("/model.json"), + Charset.defaultCharset()); + configuration.setProperty("wayang.calcite.model", calciteModel); + + SqlContext sqlContext = new SqlContext(configuration); + + + Collection<Record> result = sqlContext.executeSql( + "SELECT avg(amount) FROM postgres.payment" + ); + + printResults(10, result); + } public static void main(String... args) throws Exception { // BasicConfigurator.configure(); @@ -140,9 +160,10 @@ public class SqlAPI { // new SqlAPI().exampleFs(); // new SqlAPI().exampleWithPostgres(); // new SqlAPI().exampleJoinWithPostgres(); +// new SqlAPI().exampleCrossPlatform(); long startTime = System.nanoTime(); - new SqlAPI().exampleCrossPlatform(); + new SqlAPI().exampleAggregateWithPostgres(); long endTime = System.nanoTime(); long duration = (endTime - startTime); diff --git a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlTest.java b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlTest.java index 621da6c9..c1d25869 100755 --- a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlTest.java +++ b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlTest.java @@ -39,9 +39,9 @@ public class SqlTest { public static void main(String[] args) { WayangPlan wayangPlan; Configuration configuration = new Configuration(); - configuration.setProperty("wayang.postgres.jdbc.url", "jdbc:postgresql://localhost:5432/imdb"); - configuration.setProperty("wayang.postgres.jdbc.user", "postgres"); - configuration.setProperty("wayang.postgres.jdbc.password", "password"); + configuration.setProperty("wayang.postgres.jdbc.url", "jdbc:postgresql://localhost:5432/dvdrental"); + configuration.setProperty("wayang.postgres.jdbc.user", "aditya"); + configuration.setProperty("wayang.postgres.jdbc.password", "12345678"); WayangContext wayangContext = new WayangContext(configuration) .withPlugin(Java.basicPlugin()) @@ -50,7 +50,7 @@ public class SqlTest { Collection<Record> collector = new ArrayList<>(); - TableSource customer = new PostgresTableSource("person"); + TableSource customer = new PostgresTableSource("sales"); MapOperator<Record, Record> projection = MapOperator.createProjection( Record.class, Record.class, diff --git a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java index 91b27d14..471c11d5 100755 --- a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java +++ b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java @@ -84,7 +84,8 @@ public class SqlToWayangRelTest { WayangRules.WAYANG_PROJECT_RULE, WayangRules.WAYANG_FILTER_RULE, WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, - WayangRules.WAYANG_JOIN_RULE + WayangRules.WAYANG_JOIN_RULE, + WayangRules.WAYANG_AGGREGATE_RULE ); RelNode wayangRel = optimizer.optimize( diff --git a/wayang-api/wayang-api-sql/src/test/resources/model.json b/wayang-api/wayang-api-sql/src/test/resources/model.json index 9465c4a1..0a5ea233 100644 --- a/wayang-api/wayang-api-sql/src/test/resources/model.json +++ b/wayang-api/wayang-api-sql/src/test/resources/model.json @@ -1,24 +1,16 @@ { - "version": '1.0', - "defaultSchema": 'wayang', + "version": "1.0", + "defaultSchema": "wayang", "schemas": [ { - "name": 'postgres', - "type": 'custom', - "factory": 'org.apache.wayang.api.sql.calcite.jdbc.JdbcSchema$Factory', + "name": "postgres", + "type": "custom", + "factory": "org.apache.wayang.api.sql.calcite.jdbc.JdbcSchema$Factory", "operand": { - "jdbcDriver": 'org.postgresql.Driver', - "jdbcUrl": 'jdbc:postgresql://localhost:5432/imdb', - "jdbcUser": 'postgres', - "jdbcPassword": 'postgres' - } - }, - { - "name": 'fs', - "type": 'custom', - "factory": 'org.apache.calcite.adapter.file.FileSchemaFactory', - "operand": { - "directory": 'C:/incubator-Wayang-CrossPlatform/incubator-wayang-SQL/wayang-api/wayang-api-sql/src/test/resources' + "jdbcDriver": "org.postgresql.Driver", + "jdbcUrl": "jdbc:postgresql://localhost:5432/dvdrental", + "jdbcUser": "aditya", + "jdbcPassword": "12345678" } } ]
