http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java deleted file mode 100644 index 702381d..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -import org.apache.calcite.sql.util.SqlShuttle; - -/** - * Unsupported operation to visit a RelNode. - * - */ -public class UnsupportedOperatorsVisitor extends SqlShuttle { - -}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java deleted file mode 100644 index d98c584..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * {@link org.beam.dsls.sql.planner.BeamQueryPlanner} is the main interface. - * It defines data sources, validate a SQL statement, and convert it as a Beam - * pipeline. - */ -package org.beam.dsls.sql.planner; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java deleted file mode 100644 index 64f2d1f..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.rel; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rex.RexNode; -import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; -import org.beam.dsls.sql.interpreter.BeamSQLSpELExecutor; -import org.beam.dsls.sql.planner.BeamPipelineCreator; -import org.beam.dsls.sql.planner.BeamSQLRelUtils; -import org.beam.dsls.sql.schema.BeamSQLRow; -import org.beam.dsls.sql.transform.BeamSQLFilterFn; - -/** - * BeamRelNode to replace a {@code Filter} node. - * - */ -public class BeamFilterRel extends Filter implements BeamRelNode { - - public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, - RexNode condition) { - super(cluster, traits, child, condition); - } - - @Override - public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { - return new BeamFilterRel(getCluster(), traitSet, input, condition); - } - - @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { - - RelNode input = getInput(); - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); - - String stageName = BeamSQLRelUtils.getStageName(this); - - PCollection<BeamSQLRow> upstream = planCreator.getLatestStream(); - - BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this); - - PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, - ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor))); - - planCreator.setLatestStream(projectStream); - - return planCreator.getPipeline(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java deleted file mode 100644 index 46654e5..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.rel; - -import com.google.common.base.Joiner; -import java.util.List; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.prepare.Prepare; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.TableModify; -import org.apache.calcite.rex.RexNode; -import org.beam.dsls.sql.planner.BeamPipelineCreator; -import org.beam.dsls.sql.planner.BeamSQLRelUtils; -import org.beam.dsls.sql.schema.BaseBeamTable; -import org.beam.dsls.sql.schema.BeamSQLRow; - -/** - * BeamRelNode to replace a {@code TableModify} node. - * - */ -public class BeamIOSinkRel extends TableModify implements BeamRelNode { - public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, - Prepare.CatalogReader catalogReader, RelNode child, Operation operation, - List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { - super(cluster, traits, table, catalogReader, child, operation, updateColumnList, - sourceExpressionList, flattened); - } - - @Override - public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs), - getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); - } - - @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { - - RelNode input = getInput(); - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); - - String stageName = BeamSQLRelUtils.getStageName(this); - - PCollection<BeamSQLRow> upstream = planCreator.getLatestStream(); - - String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - - BaseBeamTable targetTable = planCreator.getSourceTables().get(sourceName); - - upstream.apply(stageName, targetTable.buildIOWriter()); - - planCreator.setHasPersistent(true); - - return planCreator.getPipeline(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java deleted file mode 100644 index f14db92..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.rel; - -import com.google.common.base.Joiner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.core.TableScan; -import org.beam.dsls.sql.planner.BeamPipelineCreator; -import org.beam.dsls.sql.planner.BeamSQLRelUtils; -import org.beam.dsls.sql.schema.BaseBeamTable; -import org.beam.dsls.sql.schema.BeamSQLRow; - -/** - * BeamRelNode to replace a {@code TableScan} node. - * - */ -public class BeamIOSourceRel extends TableScan implements BeamRelNode { - - public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { - super(cluster, traitSet, table); - } - - @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { - - String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)", ""); - - BaseBeamTable sourceTable = planCreator.getSourceTables().get(sourceName); - - String stageName = BeamSQLRelUtils.getStageName(this); - - PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName, - sourceTable.buildIOReader()); - - planCreator.setLatestStream(sourceStream); - - return planCreator.getPipeline(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java deleted file mode 100644 index 50fe8e0..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.rel; - -import org.apache.calcite.plan.Convention; -import org.apache.calcite.plan.ConventionTraitDef; -import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelTrait; -import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.plan.RelTraitSet; - -/** - * Convertion for Beam SQL. - * - */ -public enum BeamLogicalConvention implements Convention { - INSTANCE; - - @Override - public Class getInterface() { - return BeamRelNode.class; - } - - @Override - public String getName() { - return "BEAM_LOGICAL"; - } - - @Override - public RelTraitDef getTraitDef() { - return ConventionTraitDef.INSTANCE; - } - - @Override - public boolean satisfies(RelTrait trait) { - return this == trait; - } - - @Override - public void register(RelOptPlanner planner) { - } - - @Override - public String toString() { - return getName(); - } - - @Override - public boolean canConvertConvention(Convention toConvention) { - return false; - } - - @Override - public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) { - return false; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java deleted file mode 100644 index e41d74e..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.rel; - -import java.util.List; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; -import org.beam.dsls.sql.interpreter.BeamSQLSpELExecutor; -import org.beam.dsls.sql.planner.BeamPipelineCreator; -import org.beam.dsls.sql.planner.BeamSQLRelUtils; -import org.beam.dsls.sql.schema.BeamSQLRecordType; -import org.beam.dsls.sql.schema.BeamSQLRow; -import org.beam.dsls.sql.transform.BeamSQLProjectFn; - -/** - * BeamRelNode to replace a {@code Project} node. - * - */ -public class BeamProjectRel extends Project implements BeamRelNode { - - /** - * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}. - * - */ - public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, - List<? extends RexNode> projects, RelDataType rowType) { - super(cluster, traits, input, projects, rowType); - } - - @Override - public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, - RelDataType rowType) { - return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType); - } - - @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { - RelNode input = getInput(); - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); - - String stageName = BeamSQLRelUtils.getStageName(this); - - PCollection<BeamSQLRow> upstream = planCreator.getLatestStream(); - - BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this); - - PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo - .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType)))); - - planCreator.setLatestStream(projectStream); - - return planCreator.getPipeline(); - - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java deleted file mode 100644 index 07ffee5..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.rel; - -import org.apache.beam.sdk.Pipeline; -import org.apache.calcite.rel.RelNode; -import org.beam.dsls.sql.planner.BeamPipelineCreator; - -/** - * A new method {@link #buildBeamPipeline(BeamPipelineCreator)} is added, it's - * called by {@link BeamPipelineCreator}. - * - */ -public interface BeamRelNode extends RelNode { - - /** - * A {@link BeamRelNode} is a recursive structure, the - * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search) - * algorithm. - * - */ - Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception; -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java deleted file mode 100644 index 13dc962..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}. - * - */ -package org.beam.dsls.sql.rel; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java deleted file mode 100644 index 2ad7c07..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.rule; - -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.logical.LogicalFilter; -import org.beam.dsls.sql.rel.BeamFilterRel; -import org.beam.dsls.sql.rel.BeamLogicalConvention; - -/** - * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}. - * - */ -public class BeamFilterRule extends ConverterRule { - public static final BeamFilterRule INSTANCE = new BeamFilterRule(); - - private BeamFilterRule() { - super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule"); - } - - @Override - public RelNode convert(RelNode rel) { - final Filter filter = (Filter) rel; - final RelNode input = filter.getInput(); - - return new BeamFilterRel(filter.getCluster(), - filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - filter.getCondition()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java deleted file mode 100644 index a44c002..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.rule; - -import java.util.List; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.prepare.Prepare; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.TableModify; -import org.apache.calcite.rel.logical.LogicalTableModify; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.schema.Table; -import org.beam.dsls.sql.rel.BeamIOSinkRel; -import org.beam.dsls.sql.rel.BeamLogicalConvention; - -/** - * A {@code ConverterRule} to replace {@link TableModify} with - * {@link BeamIOSinkRel}. - * - */ -public class BeamIOSinkRule extends ConverterRule { - public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule(); - - private BeamIOSinkRule() { - super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE, - "BeamIOSinkRule"); - } - - @Override - public RelNode convert(RelNode rel) { - final TableModify tableModify = (TableModify) rel; - final RelNode input = tableModify.getInput(); - - final RelOptCluster cluster = tableModify.getCluster(); - final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE); - final RelOptTable relOptTable = tableModify.getTable(); - final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader(); - final RelNode convertedInput = convert(input, - input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)); - final TableModify.Operation operation = tableModify.getOperation(); - final List<String> updateColumnList = tableModify.getUpdateColumnList(); - final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList(); - final boolean flattened = tableModify.isFlattened(); - - final Table table = tableModify.getTable().unwrap(Table.class); - - switch (table.getJdbcTableType()) { - case TABLE: - case STREAM: - if (operation != TableModify.Operation.INSERT) { - throw new UnsupportedOperationException( - String.format("Streams doesn't support %s modify operation", operation)); - } - return new BeamIOSinkRel(cluster, traitSet, - relOptTable, catalogReader, convertedInput, operation, updateColumnList, - sourceExpressionList, flattened); - default: - throw new IllegalArgumentException( - String.format("Unsupported table type: %s", table.getJdbcTableType())); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java deleted file mode 100644 index 9e4778b..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.rule; - -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.logical.LogicalTableScan; -import org.beam.dsls.sql.rel.BeamIOSourceRel; -import org.beam.dsls.sql.rel.BeamLogicalConvention; - -/** - * A {@code ConverterRule} to replace {@link TableScan} with - * {@link BeamIOSourceRel}. - * - */ -public class BeamIOSourceRule extends ConverterRule { - public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule(); - - private BeamIOSourceRule() { - super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE, - "BeamIOSourceRule"); - } - - @Override - public RelNode convert(RelNode rel) { - final TableScan scan = (TableScan) rel; - - return new BeamIOSourceRel(scan.getCluster(), - scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java deleted file mode 100644 index 117a056..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.rule; - -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.logical.LogicalProject; -import org.beam.dsls.sql.rel.BeamLogicalConvention; -import org.beam.dsls.sql.rel.BeamProjectRel; - -/** - * A {@code ConverterRule} to replace {@link Project} with - * {@link BeamProjectRel}. - * - */ -public class BeamProjectRule extends ConverterRule { - public static final BeamProjectRule INSTANCE = new BeamProjectRule(); - - private BeamProjectRule() { - super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule"); - } - - @Override - public RelNode convert(RelNode rel) { - final Project project = (Project) rel; - final RelNode input = project.getInput(); - - return new BeamProjectRel(project.getCluster(), - project.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - project.getProjects(), project.getRowType()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java deleted file mode 100644 index 56ddcf3..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * {@link org.apache.calcite.plan.RelOptRule} to generate {@link org.beam.dsls.sql.rel.BeamRelNode}. - */ -package org.beam.dsls.sql.rule; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java deleted file mode 100644 index 3816063..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.schema; - -import java.io.Serializable; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.calcite.DataContext; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.schema.ScannableTable; -import org.apache.calcite.schema.Schema.TableType; -import org.apache.calcite.schema.Statistic; -import org.apache.calcite.schema.Statistics; -import org.beam.dsls.sql.planner.BeamQueryPlanner; - -/** - * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. - */ -public abstract class BaseBeamTable implements ScannableTable, Serializable { - - /** - * - */ - private static final long serialVersionUID = -1262988061830914193L; - private RelDataType relDataType; - - protected BeamSQLRecordType beamSqlRecordType; - - public BaseBeamTable(RelProtoDataType protoRowType) { - this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY); - this.beamSqlRecordType = BeamSQLRecordType.from(relDataType); - } - - /** - * In Beam SQL, there's no difference between a batch query and a streaming - * query. {@link BeamIOType} is used to validate the sources. - */ - public abstract BeamIOType getSourceType(); - - /** - * create a {@code IO.read()} instance to read from source. - * - */ - public abstract PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader(); - - /** - * create a {@code IO.write()} instance to write to target. - * - */ - public abstract PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter(); - - @Override - public Enumerable<Object[]> scan(DataContext root) { - // not used as Beam SQL uses its own execution engine - return null; - } - - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return relDataType; - } - - /** - * Not used {@link Statistic} to optimize the plan. - */ - @Override - public Statistic getStatistic() { - return Statistics.UNKNOWN; - } - - /** - * all sources are treated as TABLE in Beam SQL. - */ - @Override - public TableType getJdbcTableType() { - return TableType.TABLE; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java deleted file mode 100644 index 5e55b0f..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.schema; - -import java.io.Serializable; - -/** - * Type as a source IO, determined whether it's a STREAMING process, or batch - * process. - */ -public enum BeamIOType implements Serializable { - BOUNDED, UNBOUNDED; -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java deleted file mode 100644 index dc8e381..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.schema; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Field type information in {@link BeamSQLRow}. - * - */ -//@DefaultCoder(BeamSQLRecordTypeCoder.class) -public class BeamSQLRecordType implements Serializable { - /** - * - */ - private static final long serialVersionUID = -5318734648766104712L; - private List<String> fieldsName = new ArrayList<>(); - private List<SqlTypeName> fieldsType = new ArrayList<>(); - - public static BeamSQLRecordType from(RelDataType tableInfo) { - BeamSQLRecordType record = new BeamSQLRecordType(); - for (RelDataTypeField f : tableInfo.getFieldList()) { - record.fieldsName.add(f.getName()); - record.fieldsType.add(f.getType().getSqlTypeName()); - } - return record; - } - - public int size() { - return fieldsName.size(); - } - - public List<String> getFieldsName() { - return fieldsName; - } - - public void setFieldsName(List<String> fieldsName) { - this.fieldsName = fieldsName; - } - - public List<SqlTypeName> getFieldsType() { - return fieldsType; - } - - public void setFieldsType(List<SqlTypeName> fieldsType) { - this.fieldsType = fieldsType; - } - - @Override - public String toString() { - return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]"; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java deleted file mode 100644 index 2989cb9..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.schema; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * A {@link Coder} for {@link BeamSQLRecordType}. - * - */ -public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> { - private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); - private static final VarIntCoder intCoder = VarIntCoder.of(); - - private static final BeamSQLRecordTypeCoder INSTANCE = new BeamSQLRecordTypeCoder(); - private BeamSQLRecordTypeCoder(){} - - public static BeamSQLRecordTypeCoder of() { - return INSTANCE; - } - - @Override - public void encode(BeamSQLRecordType value, OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - Context nested = context.nested(); - intCoder.encode(value.size(), outStream, nested); - for(String fieldName : value.getFieldsName()){ - stringCoder.encode(fieldName, outStream, nested); - } - for(SqlTypeName fieldType : value.getFieldsType()){ - stringCoder.encode(fieldType.name(), outStream, nested); - } - outStream.flush(); - } - - @Override - public BeamSQLRecordType decode(InputStream inStream, - org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - BeamSQLRecordType typeRecord = new BeamSQLRecordType(); - Context nested = context.nested(); - int size = intCoder.decode(inStream, nested); - for(int idx=0; idx<size; ++idx){ - typeRecord.getFieldsName().add(stringCoder.decode(inStream, nested)); - } - for(int idx=0; idx<size; ++idx){ - typeRecord.getFieldsType().add(SqlTypeName.valueOf(stringCoder.decode(inStream, nested))); - } - return typeRecord; - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - // TODO Auto-generated method stub - return null; - } - - @Override - public void verifyDeterministic() - throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { - // TODO Auto-generated method stub - - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java deleted file mode 100644 index db93168..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.schema; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Repersent a generic ROW record in Beam SQL. - * - */ -public class BeamSQLRow implements Serializable { - /** - * - */ - private static final long serialVersionUID = 4569220242480160895L; - - private List<Integer> nullFields = new ArrayList<>(); - private List<Object> dataValues; - private BeamSQLRecordType dataType; - - public BeamSQLRow(BeamSQLRecordType dataType) { - this.dataType = dataType; - this.dataValues = new ArrayList<>(); - for(int idx=0; idx<dataType.size(); ++idx){ - dataValues.add(null); - } - } - - public BeamSQLRow(BeamSQLRecordType dataType, List<Object> dataValues) { - this.dataValues = dataValues; - this.dataType = dataType; - } - - public void addField(String fieldName, Object fieldValue) { - addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); - } - - public void addField(int index, Object fieldValue) { - if(fieldValue == null){ - dataValues.set(index, fieldValue); - if(!nullFields.contains(index)){nullFields.add(index);} - return; - } - - SqlTypeName fieldType = dataType.getFieldsType().get(index); - switch (fieldType) { - case INTEGER: - case SMALLINT: - case TINYINT: - if(!(fieldValue instanceof Integer)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case DOUBLE: - if(!(fieldValue instanceof Double)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case BIGINT: - if(!(fieldValue instanceof Long)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case FLOAT: - if(!(fieldValue instanceof Float)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case VARCHAR: - if(!(fieldValue instanceof String)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case TIME: - case TIMESTAMP: - if(!(fieldValue instanceof Date)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - default: - throw new UnsupportedDataTypeException(fieldType); - } - dataValues.set(index, fieldValue); - } - - - public int getInteger(int idx) { - return (Integer) getFieldValue(idx); - } - - public double getDouble(int idx) { - return (Double) getFieldValue(idx); - } - - public long getLong(int idx) { - return (Long) getFieldValue(idx); - } - - public String getString(int idx) { - return (String) getFieldValue(idx); - } - - public Date getDate(int idx) { - return (Date) getFieldValue(idx); - } - - public Object getFieldValue(String fieldName) { - return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); - } - - public Object getFieldValue(int fieldIdx) { - if(nullFields.contains(fieldIdx)){ - return null; - } - - Object fieldValue = dataValues.get(fieldIdx); - SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx); - - switch (fieldType) { - case INTEGER: - case SMALLINT: - case TINYINT: - if(!(fieldValue instanceof Integer)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - }else{ - return Integer.valueOf(fieldValue.toString()); - } - case DOUBLE: - if(!(fieldValue instanceof Double)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - }else{ - return Double.valueOf(fieldValue.toString()); - } - case BIGINT: - if(!(fieldValue instanceof Long)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - }else{ - return Long.valueOf(fieldValue.toString()); - } - case FLOAT: - if(!(fieldValue instanceof Float)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - }else{ - return Float.valueOf(fieldValue.toString()); - } - case VARCHAR: - if(!(fieldValue instanceof String)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - }else{ - return fieldValue.toString(); - } - case TIME: - case TIMESTAMP: - if(!(fieldValue instanceof Date)){ - throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - }else{ - return fieldValue; - } - default: - throw new UnsupportedDataTypeException(fieldType); - } - } - - public int size() { - return dataValues.size(); - } - - public List<Object> getDataValues() { - return dataValues; - } - - public void setDataValues(List<Object> dataValues) { - this.dataValues = dataValues; - } - - public BeamSQLRecordType getDataType() { - return dataType; - } - - public void setDataType(BeamSQLRecordType dataType) { - this.dataType = dataType; - } - - public void setNullFields(List<Integer> nullFields) { - this.nullFields = nullFields; - } - - public List<Integer> getNullFields() { - return nullFields; - } - - @Override - public String toString() { - return "BeamSQLRow [dataValues=" + dataValues + ", dataType=" + dataType + "]"; - } - - /** - * Return data fields as key=value. - */ - public String valueInString() { - StringBuffer sb = new StringBuffer(); - for (int idx = 0; idx < size(); ++idx) { - sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx))); - } - return sb.substring(1); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - BeamSQLRow other = (BeamSQLRow) obj; - return toString().equals(other.toString()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java deleted file mode 100644 index 00af18d..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.schema; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Date; -import java.util.List; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.DoubleCoder; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; - -/** - * A {@link Coder} encodes {@link BeamSQLRow}. - * - */ -public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ - private static final BeamSQLRecordTypeCoder recordTypeCoder = BeamSQLRecordTypeCoder.of(); - - private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of()); - - private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); - private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of(); - private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); - private static final DoubleCoder doubleCoder = DoubleCoder.of(); - - private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder(); - private BeamSqlRowCoder(){} - - public static BeamSqlRowCoder of() { - return INSTANCE; - } - - @Override - public void encode(BeamSQLRow value, OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - recordTypeCoder.encode(value.getDataType(), outStream, context); - listCoder.encode(value.getNullFields(), outStream, context); - - Context nested = context.nested(); - - for (int idx = 0; idx < value.size(); ++idx) { - if(value.getNullFields().contains(idx)){ - continue; - } - - switch (value.getDataType().getFieldsType().get(idx)) { - case INTEGER: - case SMALLINT: - case TINYINT: - intCoder.encode(value.getInteger(idx), outStream, nested); - break; - case DOUBLE: - case FLOAT: - doubleCoder.encode(value.getDouble(idx), outStream, nested); - break; - case BIGINT: - longCoder.encode(value.getLong(idx), outStream, nested); - break; - case VARCHAR: - stringCoder.encode(value.getString(idx), outStream, nested); - break; - case TIME: - case TIMESTAMP: - longCoder.encode(value.getDate(idx).getTime(), outStream, nested); - break; - - default: - throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx)); - } - } - } - - @Override - public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { - BeamSQLRecordType type = recordTypeCoder.decode(inStream, context); - List<Integer> nullFields = listCoder.decode(inStream, context); - - BeamSQLRow record = new BeamSQLRow(type); - record.setNullFields(nullFields); - - for (int idx = 0; idx < type.size(); ++idx) { - if(nullFields.contains(idx)){ - continue; - } - - switch (type.getFieldsType().get(idx)) { - case INTEGER: - case SMALLINT: - case TINYINT: - record.addField(idx, intCoder.decode(inStream, context)); - break; - case DOUBLE: - case FLOAT: - record.addField(idx, doubleCoder.decode(inStream, context)); - break; - case BIGINT: - record.addField(idx, longCoder.decode(inStream, context)); - break; - case VARCHAR: - record.addField(idx, stringCoder.decode(inStream, context)); - break; - case TIME: - case TIMESTAMP: - record.addField(idx, new Date(longCoder.decode(inStream, context))); - break; - - default: - throw new UnsupportedDataTypeException(type.getFieldsType().get(idx)); - } - } - - return record; - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return null; - } - - @Override - public void verifyDeterministic() - throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { - - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java deleted file mode 100644 index 6240426..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.schema; - -public class InvalidFieldException extends RuntimeException { - - public InvalidFieldException() { - super(); - } - - public InvalidFieldException(String message) { - super(message); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java deleted file mode 100644 index 9a2235e..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.schema; - -import org.apache.calcite.sql.type.SqlTypeName; - -public class UnsupportedDataTypeException extends RuntimeException { - - public UnsupportedDataTypeException(SqlTypeName unsupportedType){ - super(String.format("Not support data type [%s]", unsupportedType)); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java deleted file mode 100644 index 2570763..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.schema.kafka; - -import java.util.List; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.beam.dsls.sql.schema.BeamSQLRecordType; -import org.beam.dsls.sql.schema.BeamSQLRow; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A Kafka topic that saves records as CSV format. - * - */ -public class BeamKafkaCSVTable extends BeamKafkaTable { - - /** - * - */ - private static final long serialVersionUID = 4754022536543333984L; - - public static final String DELIMITER = ","; - private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaCSVTable.class); - - public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers, - List<String> topics) { - super(protoRowType, bootstrapServers, topics); - } - - @Override - public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> - getPTransformForInput() { - return new CsvRecorderDecoder(beamSqlRecordType); - } - - @Override - public PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> - getPTransformForOutput() { - return new CsvRecorderEncoder(beamSqlRecordType); - } - - /** - * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSQLRow}. - * - */ - public static class CsvRecorderDecoder - extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> { - private BeamSQLRecordType recordType; - - public CsvRecorderDecoder(BeamSQLRecordType recordType) { - this.recordType = recordType; - } - - @Override - public PCollection<BeamSQLRow> expand(PCollection<KV<byte[], byte[]>> input) { - return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSQLRow>() { - @ProcessElement - public void processElement(ProcessContext c) { - String rowInString = new String(c.element().getValue()); - String[] parts = rowInString.split(BeamKafkaCSVTable.DELIMITER); - if (parts.length != recordType.size()) { - LOG.error(String.format("invalid record: ", rowInString)); - } else { - BeamSQLRow sourceRecord = new BeamSQLRow(recordType); - for (int idx = 0; idx < parts.length; ++idx) { - sourceRecord.addField(idx, parts[idx]); - } - c.output(sourceRecord); - } - } - })); - } - } - - /** - * A PTransform to convert {@link BeamSQLRow} to {@code KV<byte[], byte[]>}. - * - */ - public static class CsvRecorderEncoder - extends PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> { - private BeamSQLRecordType recordType; - - public CsvRecorderEncoder(BeamSQLRecordType recordType) { - this.recordType = recordType; - } - - @Override - public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSQLRow> input) { - return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSQLRow, KV<byte[], byte[]>>() { - @ProcessElement - public void processElement(ProcessContext c) { - BeamSQLRow in = c.element(); - StringBuffer sb = new StringBuffer(); - for (int idx = 0; idx < in.size(); ++idx) { - sb.append(DELIMITER); - sb.append(in.getFieldValue(idx).toString()); - } - c.output(KV.of(new byte[] {}, sb.substring(1).getBytes())); - } - })); - - } - - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java deleted file mode 100644 index 482383b..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.schema.kafka; - -import static com.google.common.base.Preconditions.checkArgument; -import java.io.Serializable; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.beam.dsls.sql.schema.BaseBeamTable; -import org.beam.dsls.sql.schema.BeamIOType; -import org.beam.dsls.sql.schema.BeamSQLRow; - -/** - * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to - * extend to convert between {@code BeamSQLRow} and {@code KV<byte[], byte[]>}. - * - */ -public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { - - /** - * - */ - private static final long serialVersionUID = -634715473399906527L; - - private String bootstrapServers; - private List<String> topics; - private Map<String, Object> configUpdates; - - protected BeamKafkaTable(RelProtoDataType protoRowType) { - super(protoRowType); - } - - public BeamKafkaTable(RelProtoDataType protoRowType, String bootstrapServers, - List<String> topics) { - super(protoRowType); - this.bootstrapServers = bootstrapServers; - this.topics = topics; - } - - public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) { - this.configUpdates = configUpdates; - return this; - } - - @Override - public BeamIOType getSourceType() { - return BeamIOType.UNBOUNDED; - } - - public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> - getPTransformForInput(); - - public abstract PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> - getPTransformForOutput(); - - @Override - public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() { - return new PTransform<PBegin, PCollection<BeamSQLRow>>() { - - @Override - public PCollection<BeamSQLRow> expand(PBegin input) { - return input.apply("read", - KafkaIO.<byte[], byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics) - .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of()) - .withValueCoder(ByteArrayCoder.of()).withoutMetadata()) - .apply("in_format", getPTransformForInput()); - - } - }; - } - - @Override - public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() { - checkArgument(topics != null && topics.size() == 1, - "Only one topic can be acceptable as output."); - - return new PTransform<PCollection<BeamSQLRow>, PDone>() { - @Override - public PDone expand(PCollection<BeamSQLRow> input) { - return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", - KafkaIO.<byte[], byte[]>write().withBootstrapServers(bootstrapServers) - .withTopic(topics.get(0)).withKeyCoder(ByteArrayCoder.of()) - .withValueCoder(ByteArrayCoder.of())); - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java deleted file mode 100644 index 822fce7..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * table schema for KafkaIO. - */ -package org.beam.dsls.sql.schema.kafka; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java deleted file mode 100644 index ef9cc7d..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * define table schema, to map with Beam IO components. - * - */ -package org.beam.dsls.sql.schema; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java deleted file mode 100644 index 06db280..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.transform; - -import java.util.List; -import org.apache.beam.sdk.transforms.DoFn; -import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; -import org.beam.dsls.sql.rel.BeamFilterRel; -import org.beam.dsls.sql.schema.BeamSQLRow; - -/** - * {@code BeamSQLFilterFn} is the executor for a {@link BeamFilterRel} step. - * - */ -public class BeamSQLFilterFn extends DoFn<BeamSQLRow, BeamSQLRow> { - /** - * - */ - private static final long serialVersionUID = -1256111753670606705L; - - private String stepName; - private BeamSQLExpressionExecutor executor; - - public BeamSQLFilterFn(String stepName, BeamSQLExpressionExecutor executor) { - super(); - this.stepName = stepName; - this.executor = executor; - } - - @Setup - public void setup() { - executor.prepare(); - } - - @ProcessElement - public void processElement(ProcessContext c) { - BeamSQLRow in = c.element(); - - List<Object> result = executor.execute(in); - - if ((Boolean) result.get(0)) { - c.output(in); - } - } - - @Teardown - public void close() { - executor.close(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java deleted file mode 100644 index 1014c0d..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.transform; - -import org.apache.beam.sdk.transforms.DoFn; -import org.beam.dsls.sql.schema.BeamSQLRow; - -/** - * A test PTransform to display output in console. - * - */ -public class BeamSQLOutputToConsoleFn extends DoFn<BeamSQLRow, Void> { - /** - * - */ - private static final long serialVersionUID = -1256111753670606705L; - - private String stepName; - - public BeamSQLOutputToConsoleFn(String stepName) { - super(); - this.stepName = stepName; - } - - @ProcessElement - public void processElement(ProcessContext c) { - System.out.println("Output: " + c.element().getDataValues()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java deleted file mode 100644 index 12061d2..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.transform; - -import java.util.List; -import org.apache.beam.sdk.transforms.DoFn; -import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; -import org.beam.dsls.sql.rel.BeamProjectRel; -import org.beam.dsls.sql.schema.BeamSQLRecordType; -import org.beam.dsls.sql.schema.BeamSQLRow; - -/** - * - * {@code BeamSQLProjectFn} is the executor for a {@link BeamProjectRel} step. - * - */ -public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> { - - /** - * - */ - private static final long serialVersionUID = -1046605249999014608L; - private String stepName; - private BeamSQLExpressionExecutor executor; - private BeamSQLRecordType outputRecordType; - - public BeamSQLProjectFn(String stepName, BeamSQLExpressionExecutor executor, - BeamSQLRecordType outputRecordType) { - super(); - this.stepName = stepName; - this.executor = executor; - this.outputRecordType = outputRecordType; - } - - @Setup - public void setup() { - executor.prepare(); - } - - @ProcessElement - public void processElement(ProcessContext c) { - List<Object> results = executor.execute(c.element()); - - BeamSQLRow outRow = new BeamSQLRow(outputRecordType); - for (int idx = 0; idx < results.size(); ++idx) { - outRow.addField(idx, results.get(idx)); - } - - c.output(outRow); - } - - @Teardown - public void close() { - executor.close(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java deleted file mode 100644 index 2607abf..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSQL pipeline. - */ -package org.beam.dsls.sql.transform; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java new file mode 100644 index 0000000..733b056 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.BeforeClass; + +/** + * prepare {@code BeamSqlRunner} for test. + * + */ +public class BasePlanner { + public static BeamSqlRunner runner = new BeamSqlRunner(); + + @BeforeClass + public static void prepare() { + runner.addTable("ORDER_DETAILS", getTable()); + runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); + runner.addTable("SUB_ORDER_RAM", getTable()); + } + + private static BaseBeamTable getTable() { + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); + } + }; + + return new MockedBeamSQLTable(protoRowType); + } + + public static BaseBeamTable getTable(String bootstrapServer, String topic) { + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); + } + }; + + Map<String, Object> consumerPara = new HashMap<String, Object>(); + consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + + return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic)) + .updateConsumerProperties(consumerPara); + } +}
