http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java new file mode 100644 index 0000000..1b168fb --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Pair; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class GearJoinRel extends Join implements GearRelNode { + public GearJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) { + super(cluster, traits, left, right, condition, variablesSet, joinType); + } + + @Override + public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, + RelNode right, JoinRelType joinType, boolean semiJoinDone) { + return new GearJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet, + joinType); + } + + private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) { + // it's a CROSS JOIN because: condition == true + if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) { + throw new UnsupportedOperationException("CROSS JOIN is not supported!"); + } + + RexCall call = (RexCall) condition; + List<Pair<Integer, Integer>> pairs = new ArrayList<>(); + if ("AND".equals(call.getOperator().getName())) { + List<RexNode> operands = call.getOperands(); + for (RexNode rexNode : operands) { + Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount); + pairs.add(pair); + } + } else if ("=".equals(call.getOperator().getName())) { + pairs.add(extractOneJoinColumn(call, leftRowColumnCount)); + } else { + throw new UnsupportedOperationException( + "Operator " + call.getOperator().getName() + " is not supported in join condition"); + } + + return pairs; + } + + private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition, + int leftRowColumnCount) { + List<RexNode> operands = oneCondition.getOperands(); + final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + + final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + final int rightIndex = rightIndex1 - leftRowColumnCount; + + return new Pair<>(leftIndex, rightIndex); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java new file mode 100644 index 0000000..ced38c3 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.*; + +public enum GearLogicalConvention implements Convention { + INSTANCE; + + @Override + public Class getInterface() { + return GearRelNode.class; + } + + @Override + public String getName() { + return "GEAR_LOGICAL"; + } + + @Override + public RelTraitDef getTraitDef() { + return ConventionTraitDef.INSTANCE; + } + + @Override + public boolean satisfies(RelTrait trait) { + return this == trait; + } + + @Override + public void register(RelOptPlanner planner) { + } + + @Override + public String toString() { + return getName(); + } + + @Override + public boolean canConvertConvention(Convention toConvention) { + return false; + } + + @Override + public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java new file mode 100644 index 0000000..1ca972a --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.core.SetOp; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.List; + +public class GearMinusRel extends Minus implements GearRelNode { + + private GearSetOperatorRelBase delegate; + + public GearMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + delegate = new GearSetOperatorRelBase(this, GearSetOperatorRelBase.OpType.MINUS, inputs, all); + } + + @Override + public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new GearMinusRel(getCluster(), traitSet, inputs, all); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java new file mode 100644 index 0000000..f09dc8c --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.List; + +public class GearProjectRel extends Project implements GearRelNode { + + public GearProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, + List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + @Override + public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, + RelDataType rowType) { + return new GearProjectRel(getCluster(), traitSet, input, projects, rowType); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java new file mode 100644 index 0000000..042c767 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.rel.RelNode; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +public interface GearRelNode extends RelNode { + + JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java new file mode 100644 index 0000000..ee59753 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.rel.RelNode; + +import java.io.Serializable; +import java.util.List; + +public class GearSetOperatorRelBase { + + public enum OpType implements Serializable { + UNION, + INTERSECT, + MINUS + } + + private GearRelNode gearRelNode; + private List<RelNode> inputs; + private boolean all; + private OpType opType; + + public GearSetOperatorRelBase(GearRelNode gearRelNode, OpType opType, + List<RelNode> inputs, boolean all) { + this.gearRelNode = gearRelNode; + this.opType = opType; + this.inputs = inputs; + this.all = all; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java new file mode 100644 index 0000000..f70481b --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationImpl; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; + +public class GearSortRel extends Sort implements GearRelNode { + + private List<Integer> fieldIndices = new ArrayList<>(); + private List<Boolean> orientation = new ArrayList<>(); + private List<Boolean> nullsFirst = new ArrayList<>(); + + private int startIndex = 0; + private int count; + + public GearSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation, + RexNode offset, RexNode fetch) { + super(cluster, traits, child, collation, offset, fetch); + + List<RexNode> fieldExps = getChildExps(); + RelCollationImpl collationImpl = (RelCollationImpl) collation; + List<RelFieldCollation> collations = collationImpl.getFieldCollations(); + for (int i = 0; i < fieldExps.size(); i++) { + RexNode fieldExp = fieldExps.get(i); + RexInputRef inputRef = (RexInputRef) fieldExp; + fieldIndices.add(inputRef.getIndex()); + orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING); + + RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection; + if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) { + rawNullDirection = collations.get(i).getDirection().defaultNullDirection(); + } + nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST); + } + + if (fetch == null) { + throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!"); + } + + RexLiteral fetchLiteral = (RexLiteral) fetch; + count = ((BigDecimal) fetchLiteral.getValue()).intValue(); + + if (offset != null) { + RexLiteral offsetLiteral = (RexLiteral) offset; + startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue(); + } + } + + @Override + public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, + RexNode offset, RexNode fetch) { + return new GearSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch); + } + + public static <T extends Number & Comparable> int numberCompare(T a, T b) { + return a.compareTo(b); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java new file mode 100644 index 0000000..54a6bbb --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlExplainLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + +class GearSqlRelUtils { + private static final Logger LOG = LoggerFactory.getLogger(GearSqlRelUtils.class); + + private static final AtomicInteger sequence = new AtomicInteger(0); + private static final AtomicInteger classSequence = new AtomicInteger(0); + + public static String getStageName(GearRelNode relNode) { + return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + + sequence.getAndIncrement(); + } + + public static String getClassName(GearRelNode relNode) { + return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + + "_" + classSequence.getAndIncrement(); + } + + public static GearRelNode getGearRelInput(RelNode input) { + if (input instanceof RelSubset) { + // go with known best input + input = ((RelSubset) input).getBest(); + } + return (GearRelNode) input; + } + + public static String explain(final RelNode rel) { + return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES); + } + + public static String explain(final RelNode rel, SqlExplainLevel detailLevel) { + String explain = ""; + try { + explain = RelOptUtil.toString(rel); + } catch (StackOverflowError e) { + LOG.error("StackOverflowError occurred while extracting plan. " + + "Please report it to the dev@ mailing list."); + LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e); + LOG.error("Forcing plan to empty string and continue... " + + "SQL Runner may not working properly after."); + } + return explain; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java new file mode 100644 index 0000000..431368d --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.core.Union; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.List; + +public class GearUnionRel extends Union implements GearRelNode { + + private GearSetOperatorRelBase delegate; + + public GearUnionRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) { + super(cluster, traits, inputs, all); + this.delegate = new GearSetOperatorRelBase(this, GearSetOperatorRelBase.OpType.UNION, inputs, all); + } + + public GearUnionRel(RelInput input) { + super(input); + } + + @Override + public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new GearUnionRel(getCluster(), traitSet, inputs, all); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java new file mode 100644 index 0000000..6bd9403 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +public class GearValuesRel extends Values implements GearRelNode { + + public GearValuesRel(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples, + RelTraitSet traits) { + super(cluster, rowType, tuples, traits); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java new file mode 100644 index 0000000..c1b1602 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.*; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.gearpump.sql.rel.GearAggregationRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.utils.GearConfiguration; +import org.apache.gearpump.streaming.dsl.window.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.GregorianCalendar; +import java.util.List; + +public class GearAggregationRule extends RelOptRule { + + private static final Logger LOG = LoggerFactory.getLogger(GearAggregationRule.class); + public static final GearAggregationRule INSTANCE = + new GearAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER); + + public GearAggregationRule(Class<? extends Aggregate> aggregateClass, + Class<? extends Project> projectClass, + RelBuilderFactory relBuilderFactory) { + super(operand(aggregateClass, operand(projectClass, any())), relBuilderFactory, null); + } + + public GearAggregationRule(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final Aggregate aggregate = call.rel(0); + final Project project = call.rel(1); + updateWindowTrigger(call, aggregate, project); + } + + private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate, Project project) { + ImmutableBitSet groupByFields = aggregate.getGroupSet(); + List<RexNode> projectMapping = project.getProjects(); + + WindowFunction windowFn = new GlobalWindowFunction(); + Trigger triggerFn; + int windowFieldIdx = -1; + Duration allowedLatence = Duration.ZERO; + + for (int groupField : groupByFields.asList()) { + RexNode projNode = projectMapping.get(groupField); + if (projNode instanceof RexCall) { + SqlOperator op = ((RexCall) projNode).op; + ImmutableList<RexNode> parameters = ((RexCall) projNode).operands; + String functionName = op.getName(); + switch (functionName) { + case "TUMBLE": + windowFieldIdx = groupField; + windowFn = (WindowFunction) FixedWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1)))); + if (parameters.size() == 3) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis())); + } + break; + case "HOP": + windowFieldIdx = groupField; + windowFn = (WindowFunction) SlidingWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1))), Duration.ofMillis(getWindowParameterAsMillis(parameters.get(2)))); + + if (parameters.size() == 4) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis())); + } + break; + case "SESSION": + windowFieldIdx = groupField; + windowFn = (WindowFunction) SessionWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1)))); + if (parameters.size() == 3) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis())); + } + break; + default: + break; + } + } + } + + try { + GearAggregationRel gearRel = new GearAggregationRel(aggregate.getCluster(), + aggregate.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convert(aggregate.getInput(), + aggregate.getInput().getTraitSet().replace(GearLogicalConvention.INSTANCE)), + aggregate.indicator, + aggregate.getGroupSet(), + aggregate.getGroupSets(), + aggregate.getAggCallList()); + gearRel.buildGearPipeline(GearConfiguration.app, null); + GearConfiguration.app.submit().waitUntilFinish(); + } catch (Exception e) { + LOG.error(e.getMessage()); + } + + } + + private Trigger createTriggerWithDelay(GregorianCalendar delayTime) { + return null; + } + + private long getWindowParameterAsMillis(RexNode parameterNode) { + if (parameterNode instanceof RexLiteral) { + return RexLiteral.intValue(parameterNode); + } else { + throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java new file mode 100644 index 0000000..4817ee8 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.gearpump.sql.rel.GearFilterRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; + +public class GearFilterRule extends ConverterRule { + + public static final GearFilterRule INSTANCE = new GearFilterRule(); + + private GearFilterRule() { + super(LogicalFilter.class, Convention.NONE, GearLogicalConvention.INSTANCE, "GearFilterRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Filter filter = (Filter) rel; + final RelNode input = filter.getInput(); + + GearFilterRel gearRel = new GearFilterRel(filter.getCluster(), + filter.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)), + filter.getCondition()); + return gearRel; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java new file mode 100644 index 0000000..e81f948 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearFlatMapRel; +import org.apache.gearpump.sql.utils.GearConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GearFlatMapRule extends ConverterRule { + + private static final Logger LOG = LoggerFactory.getLogger(GearFlatMapRule.class); + public static final GearFlatMapRule INSTANCE = new GearFlatMapRule(Aggregate.class, Convention.NONE); + + public GearFlatMapRule(Class<? extends Aggregate> aggregateClass, RelTrait projectIn) { + super(aggregateClass, projectIn, GearLogicalConvention.INSTANCE, "GearFlatMapRule"); + } + + @Override + public RelNode convert(RelNode rel) { + try { + GearFlatMapRel flatRel = new GearFlatMapRel(); + flatRel.buildGearPipeline(GearConfiguration.app, null); + } catch (Exception e) { + LOG.error(e.getMessage()); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java new file mode 100644 index 0000000..ca525d6 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import java.util.List; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.Table; +import org.apache.gearpump.sql.rel.GearIOSinkRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; + +public class GearIOSinkRule extends ConverterRule { + + public static final GearIOSinkRule INSTANCE = new GearIOSinkRule(); + + private GearIOSinkRule() { + super(LogicalTableModify.class, Convention.NONE, GearLogicalConvention.INSTANCE, + "GearIOSinkRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableModify tableModify = (TableModify) rel; + final RelNode input = tableModify.getInput(); + + final RelOptCluster cluster = tableModify.getCluster(); + final RelTraitSet traitSet = tableModify.getTraitSet().replace(GearLogicalConvention.INSTANCE); + final RelOptTable relOptTable = tableModify.getTable(); + final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader(); + final RelNode convertedInput = convert(input, + input.getTraitSet().replace(GearLogicalConvention.INSTANCE)); + final TableModify.Operation operation = tableModify.getOperation(); + final List<String> updateColumnList = tableModify.getUpdateColumnList(); + final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList(); + final boolean flattened = tableModify.isFlattened(); + + final Table table = tableModify.getTable().unwrap(Table.class); + + switch (table.getJdbcTableType()) { + case TABLE: + case STREAM: + if (operation != TableModify.Operation.INSERT) { + throw new UnsupportedOperationException( + String.format("Streams doesn't support %s modify operation", operation)); + } + return new GearIOSinkRel(cluster, traitSet, + relOptTable, catalogReader, convertedInput, operation, updateColumnList, + sourceExpressionList, flattened); + default: + throw new IllegalArgumentException( + String.format("Unsupported table type: %s", table.getJdbcTableType())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java new file mode 100644 index 0000000..a725b1a --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.gearpump.sql.rel.GearIOSourceRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; + +public class GearIOSourceRule extends ConverterRule { + + public static final GearIOSourceRule INSTANCE = new GearIOSourceRule(); + + private GearIOSourceRule() { + super(LogicalTableScan.class, Convention.NONE, GearLogicalConvention.INSTANCE, + "GearIOSourceRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableScan scan = (TableScan) rel; + + return new GearIOSourceRel(scan.getCluster(), + scan.getTraitSet().replace(GearLogicalConvention.INSTANCE), scan.getTable()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java new file mode 100644 index 0000000..eb149f3 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.logical.LogicalIntersect; +import org.apache.gearpump.sql.rel.GearIntersectRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; + +import java.util.List; + +public class GearIntersectRule extends ConverterRule { + + public static final GearIntersectRule INSTANCE = new GearIntersectRule(); + + private GearIntersectRule() { + super(LogicalIntersect.class, Convention.NONE, + GearLogicalConvention.INSTANCE, "GearIntersectRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Intersect intersect = (Intersect) rel; + final List<RelNode> inputs = intersect.getInputs(); + return new GearIntersectRel( + intersect.getCluster(), + intersect.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convertList(inputs, GearLogicalConvention.INSTANCE), + intersect.all + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java new file mode 100644 index 0000000..e86db06 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.gearpump.sql.rel.GearJoinRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; + +public class GearJoinRule extends ConverterRule { + + public static final GearJoinRule INSTANCE = new GearJoinRule(); + + private GearJoinRule() { + super(LogicalJoin.class, Convention.NONE, + GearLogicalConvention.INSTANCE, "GearJoinRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Join join = (Join) rel; + return new GearJoinRel( + join.getCluster(), + join.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convert(join.getLeft(), + join.getLeft().getTraitSet().replace(GearLogicalConvention.INSTANCE)), + convert(join.getRight(), + join.getRight().getTraitSet().replace(GearLogicalConvention.INSTANCE)), + join.getCondition(), + join.getVariablesSet(), + join.getJoinType() + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java new file mode 100644 index 0000000..103a29d --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.logical.LogicalMinus; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearMinusRel; + +import java.util.List; + +public class GearMinusRule extends ConverterRule { + + public static final GearMinusRule INSTANCE = new GearMinusRule(); + + private GearMinusRule() { + super(LogicalMinus.class, Convention.NONE, + GearLogicalConvention.INSTANCE, "GearMinusRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Minus minus = (Minus) rel; + final List<RelNode> inputs = minus.getInputs(); + return new GearMinusRel( + minus.getCluster(), + minus.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convertList(inputs, GearLogicalConvention.INSTANCE), + minus.all + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java new file mode 100644 index 0000000..6b09550 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearProjectRel; + +public class GearProjectRule extends ConverterRule { + + public static final GearProjectRule INSTANCE = new GearProjectRule(); + + private GearProjectRule() { + super(LogicalProject.class, Convention.NONE, GearLogicalConvention.INSTANCE, + "GearProjectRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Project project = (Project) rel; + final RelNode input = project.getInput(); + + return new GearProjectRel(project.getCluster(), + project.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)), + project.getProjects(), project.getRowType()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java new file mode 100644 index 0000000..0a0d9e4 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearSortRel; + +public class GearSortRule extends ConverterRule { + + public static final GearSortRule INSTANCE = new GearSortRule(); + + private GearSortRule() { + super(LogicalSort.class, Convention.NONE, + GearLogicalConvention.INSTANCE, "GearSortRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Sort sort = (Sort) rel; + final RelNode input = sort.getInput(); + return new GearSortRel( + sort.getCluster(), + sort.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)), + sort.getCollation(), + sort.offset, + sort.fetch + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java new file mode 100644 index 0000000..7a17a46 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.logical.LogicalUnion; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearUnionRel; + +public class GearUnionRule extends ConverterRule { + + public static final GearUnionRule INSTANCE = new GearUnionRule(); + + private GearUnionRule() { + super(LogicalUnion.class, Convention.NONE, GearLogicalConvention.INSTANCE, + "GearUnionRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Union union = (Union) rel; + + return new GearUnionRel( + union.getCluster(), + union.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convertList(union.getInputs(), GearLogicalConvention.INSTANCE), + union.all + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java new file mode 100644 index 0000000..b04eec2 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.logical.LogicalValues; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearValuesRel; + +public class GearValuesRule extends ConverterRule { + + public static final GearValuesRule INSTANCE = new GearValuesRule(); + + private GearValuesRule() { + super(LogicalValues.class, Convention.NONE, + GearLogicalConvention.INSTANCE, "GearValuesRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Values values = (Values) rel; + return new GearValuesRel( + values.getCluster(), + values.getRowType(), + values.getTuples(), + values.getTraitSet().replace(GearLogicalConvention.INSTANCE) + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java new file mode 100644 index 0000000..7ecba21 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.table; + +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; + +public class SampleString { + + public static JavaStream<String> WORDS; + + public static class Stream { + public static final Message[] KV = {new Message("001", "This is a good start, bingo!! bingo!!")}; + + public static String getKV() { + return KV[0].WORD; + } + } + + public static class Message { + public final String ID; + public final String WORD; + + public Message(String ID, String WORD) { + this.ID = ID; + this.WORD = WORD; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java new file mode 100644 index 0000000..4aa20e0 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.table; + +public class SampleTransactions { + + public static class Transactions { + + public final Order[] ORDERS = { + new Order("001", 3), + new Order("002", 5), + new Order("003", 8), + new Order("004", 15), + }; + + public final Product[] PRODUCTS = { + new Product("001", "Book"), + new Product("002", "Pen"), + new Product("003", "Pencil"), + new Product("004", "Ruler"), + }; + } + + public static class Order { + public final String ID; + public final int QUANTITY; + + public Order(String ID, int QUANTITY) { + this.ID = ID; + this.QUANTITY = QUANTITY; + } + } + + public static class Product { + public final String ID; + public final String NAME; + + public Product(String ID, String NAME) { + this.ID = ID; + this.NAME = NAME; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java new file mode 100644 index 0000000..4ff9efd --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.table; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.*; +import org.apache.calcite.sql.type.SqlTypeName; + +import java.util.Map; + +public class TransactionsTableFactory implements TableFactory<Table> { + + @Override + public Table create(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) { + final Object[][] rows = { + {100, "I001", "item1", 3}, + {101, "I002", "item2", 5}, + {102, "I003", "item3", 8}, + {103, "I004", "item4", 33}, + {104, "I005", "item5", 23} + }; + + return new TransactionsTable(ImmutableList.copyOf(rows)); + } + + public static class TransactionsTable implements ScannableTable { + + protected final RelProtoDataType protoRowType = new RelProtoDataType() { + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder() + .add("timeStamp", SqlTypeName.TIMESTAMP) + .add("id", SqlTypeName.VARCHAR, 10) + .add("item", SqlTypeName.VARCHAR, 50) + .add("quantity", SqlTypeName.INTEGER) + .build(); + } + }; + + private final ImmutableList<Object[]> rows; + + public TransactionsTable(ImmutableList<Object[]> rows) { + this.rows = rows; + } + + public Enumerable<Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.UNKNOWN; + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java new file mode 100644 index 0000000..a63036d --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.utils; + +import org.apache.calcite.config.Lex; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.RuleSets; + +import java.util.ArrayList; +import java.util.List; + +public class CalciteFrameworkConfiguration { + + public static FrameworkConfig getDefaultconfig(SchemaPlus schema) { + final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); + + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + + FrameworkConfig frameworkConfiguration = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder() + .setLex(Lex.JAVA) + .build()) + .defaultSchema(schema) + .traitDefs(traitDefs) + .context(Contexts.EMPTY_CONTEXT) + .ruleSets(RuleSets.ofList()) + .costFactory(null) + .typeSystem(RelDataTypeSystem.DEFAULT) + .build(); + + return frameworkConfiguration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java new file mode 100644 index 0000000..03b2a47 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.utils; + +import com.typesafe.config.Config; +import org.apache.gearpump.cluster.ClusterConfig; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.cluster.client.ClientContext; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; + +public class GearConfiguration { + + private Config akkaConf; + private ClientContext context; + public static JavaStreamApp app; + + public void defaultConfiguration() { + setAkkaConf(ClusterConfig.defaultConfig()); + setContext(ClientContext.apply(akkaConf)); + } + + public void ConfigJavaStreamApp() { + app = new JavaStreamApp("JavaDSL", context, UserConfig.empty()); + } + + public void setAkkaConf(Config akkaConf) { + this.akkaConf = akkaConf; + } + + public void setContext(ClientContext context) { + this.context = context; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java new file mode 100644 index 0000000..d3d723f --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.validator; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.validate.SqlConformance; +import org.apache.calcite.sql.validate.SqlValidatorImpl; + +public class CalciteSqlValidator extends SqlValidatorImpl { + public CalciteSqlValidator(SqlOperatorTable opTab, + CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory, + SqlConformance conformance) { + super(opTab, catalogReader, typeFactory, conformance); + } + + @Override + protected RelDataType getLogicalSourceRowType( + RelDataType sourceRowType, SqlInsert insert) { + final RelDataType superType = + super.getLogicalSourceRowType(sourceRowType, insert); + return ((JavaTypeFactory) typeFactory).toSql(superType); + } + + @Override + protected RelDataType getLogicalTargetRowType( + RelDataType targetRowType, SqlInsert insert) { + final RelDataType superType = + super.getLogicalTargetRowType(targetRowType, insert); + return ((JavaTypeFactory) typeFactory).toSql(superType); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala b/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala index 8fae091..ee55aa8 100644 --- a/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala +++ b/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala @@ -13,9 +13,6 @@ import org.apache.calcite.linq4j.tree.Expression import org.apache.calcite.linq4j.{Enumerator, Queryable} import org.apache.log4j.Logger -/** - * Created by Buddhi on 6/8/2017. - */ class Connection extends CalciteConnection { import org.apache.calcite.schema.SchemaPlus
