[BEAM-2740] Hide BeamSqlEnv.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/49aad927 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/49aad927 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/49aad927 Branch: refs/heads/DSL_SQL Commit: 49aad927d4d9cf58c30c04641c766a62d44f44b7 Parents: 9eec6a0 Author: James Xu <[email protected]> Authored: Wed Aug 9 18:54:54 2017 +0800 Committer: Tyler Akidau <[email protected]> Committed: Tue Aug 15 11:40:39 2017 -0700 ---------------------------------------------------------------------- .../sdk/extensions/sql/BeamRecordSqlType.java | 185 ++++++++ .../apache/beam/sdk/extensions/sql/BeamSql.java | 113 ++--- .../beam/sdk/extensions/sql/BeamSqlCli.java | 65 --- .../beam/sdk/extensions/sql/BeamSqlEnv.java | 127 ------ .../sdk/extensions/sql/BeamSqlRecordHelper.java | 217 +++++++++ .../beam/sdk/extensions/sql/BeamSqlUdf.java | 41 ++ .../extensions/sql/example/BeamSqlExample.java | 2 +- .../sdk/extensions/sql/impl/BeamSqlCli.java | 65 +++ .../sdk/extensions/sql/impl/BeamSqlEnv.java | 135 ++++++ .../sdk/extensions/sql/impl/package-info.java | 22 + .../sql/impl/planner/BeamQueryPlanner.java | 9 +- .../sql/impl/rel/BeamAggregationRel.java | 4 +- .../extensions/sql/impl/rel/BeamFilterRel.java | 2 +- .../extensions/sql/impl/rel/BeamIOSinkRel.java | 6 +- .../sql/impl/rel/BeamIOSourceRel.java | 6 +- .../sql/impl/rel/BeamIntersectRel.java | 2 +- .../extensions/sql/impl/rel/BeamJoinRel.java | 4 +- .../extensions/sql/impl/rel/BeamMinusRel.java | 2 +- .../extensions/sql/impl/rel/BeamProjectRel.java | 2 +- .../extensions/sql/impl/rel/BeamRelNode.java | 5 +- .../sql/impl/rel/BeamSetOperatorRelBase.java | 2 +- .../extensions/sql/impl/rel/BeamSortRel.java | 4 +- .../extensions/sql/impl/rel/BeamUnionRel.java | 2 +- .../extensions/sql/impl/rel/BeamValuesRel.java | 6 +- .../sql/impl/schema/BaseBeamTable.java | 35 ++ .../extensions/sql/impl/schema/BeamIOType.java | 28 ++ .../sql/impl/schema/BeamPCollectionTable.java | 63 +++ .../sql/impl/schema/BeamSqlTable.java | 54 +++ .../sql/impl/schema/BeamTableUtils.java | 118 +++++ .../impl/schema/kafka/BeamKafkaCSVTable.java | 109 +++++ .../sql/impl/schema/kafka/BeamKafkaTable.java | 109 +++++ .../sql/impl/schema/kafka/package-info.java | 22 + .../sql/impl/schema/package-info.java | 22 + .../sql/impl/schema/text/BeamTextCSVTable.java | 70 +++ .../schema/text/BeamTextCSVTableIOReader.java | 58 +++ .../schema/text/BeamTextCSVTableIOWriter.java | 58 +++ .../sql/impl/schema/text/BeamTextTable.java | 41 ++ .../sql/impl/schema/text/package-info.java | 22 + .../transform/BeamAggregationTransforms.java | 4 +- .../sql/impl/transform/BeamJoinTransforms.java | 4 +- .../sql/impl/transform/BeamSqlProjectFn.java | 4 +- .../extensions/sql/impl/utils/CalciteUtils.java | 2 +- .../extensions/sql/schema/BaseBeamTable.java | 34 -- .../sdk/extensions/sql/schema/BeamIOType.java | 28 -- .../sql/schema/BeamPCollectionTable.java | 62 --- .../sql/schema/BeamRecordSqlType.java | 185 -------- .../sql/schema/BeamSqlRecordHelper.java | 217 --------- .../sdk/extensions/sql/schema/BeamSqlTable.java | 53 --- .../sdk/extensions/sql/schema/BeamSqlUdf.java | 41 -- .../extensions/sql/schema/BeamTableUtils.java | 117 ----- .../sql/schema/kafka/BeamKafkaCSVTable.java | 109 ----- .../sql/schema/kafka/BeamKafkaTable.java | 109 ----- .../sql/schema/kafka/package-info.java | 22 - .../sdk/extensions/sql/schema/package-info.java | 22 - .../sql/schema/text/BeamTextCSVTable.java | 70 --- .../schema/text/BeamTextCSVTableIOReader.java | 58 --- .../schema/text/BeamTextCSVTableIOWriter.java | 58 --- .../sql/schema/text/BeamTextTable.java | 41 -- .../sql/schema/text/package-info.java | 22 - .../extensions/sql/BeamSqlApiSurfaceTest.java | 12 +- .../sql/BeamSqlDslAggregationTest.java | 1 - .../beam/sdk/extensions/sql/BeamSqlDslBase.java | 1 - .../sdk/extensions/sql/BeamSqlDslJoinTest.java | 1 - .../extensions/sql/BeamSqlDslProjectTest.java | 1 - .../extensions/sql/BeamSqlDslUdfUdafTest.java | 2 - .../beam/sdk/extensions/sql/TestUtils.java | 1 - .../interpreter/BeamSqlFnExecutorTestBase.java | 2 +- .../extensions/sql/impl/rel/BaseRelTest.java | 34 ++ .../sql/impl/rel/BeamIntersectRelTest.java | 9 +- .../rel/BeamJoinRelBoundedVsBoundedTest.java | 23 +- .../rel/BeamJoinRelUnboundedVsBoundedTest.java | 25 +- .../BeamJoinRelUnboundedVsUnboundedTest.java | 19 +- .../sql/impl/rel/BeamMinusRelTest.java | 9 +- .../impl/rel/BeamSetOperatorRelBaseTest.java | 9 +- .../sql/impl/rel/BeamSortRelTest.java | 17 +- .../sql/impl/rel/BeamUnionRelTest.java | 9 +- .../sql/impl/rel/BeamValuesRelTest.java | 11 +- .../sql/impl/schema/BeamSqlRowCoderTest.java | 77 ++++ .../schema/kafka/BeamKafkaCSVTableTest.java | 107 +++++ .../impl/schema/text/BeamTextCSVTableTest.java | 176 +++++++ .../transform/BeamAggregationTransformTest.java | 453 +++++++++++++++++++ .../schema/transform/BeamTransformBaseTest.java | 97 ++++ ...mSqlBuiltinFunctionsIntegrationTestBase.java | 2 +- ...amSqlComparisonOperatorsIntegrationTest.java | 2 +- .../extensions/sql/mock/MockedBoundedTable.java | 4 +- .../sdk/extensions/sql/mock/MockedTable.java | 4 +- .../sql/mock/MockedUnboundedTable.java | 4 +- .../sql/schema/BeamSqlRowCoderTest.java | 76 ---- .../sql/schema/kafka/BeamKafkaCSVTableTest.java | 107 ----- .../sql/schema/text/BeamTextCSVTableTest.java | 176 ------- .../transform/BeamAggregationTransformTest.java | 453 ------------------- .../schema/transform/BeamTransformBaseTest.java | 97 ---- 92 files changed, 2575 insertions(+), 2545 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java new file mode 100644 index 0000000..5269867 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import java.math.BigDecimal; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.BooleanCoder; +import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.DateCoder; +import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.DoubleCoder; +import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.FloatCoder; +import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.ShortCoder; +import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.TimeCoder; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.BeamRecordType; + +/** + * Type provider for {@link BeamRecord} with SQL types. + * + * <p>Limited SQL types are supported now, visit + * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a> + * for more details. + * + */ +public class BeamRecordSqlType extends BeamRecordType { + private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); + static { + SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); + } + + public List<Integer> fieldTypes; + + protected BeamRecordSqlType(List<String> fieldsName, List<Coder> fieldsCoder) { + super(fieldsName, fieldsCoder); + } + + private BeamRecordSqlType(List<String> fieldsName, List<Integer> fieldTypes + , List<Coder> fieldsCoder) { + super(fieldsName, fieldsCoder); + this.fieldTypes = fieldTypes; + } + + public static BeamRecordSqlType create(List<String> fieldNames, + List<Integer> fieldTypes) { + if (fieldNames.size() != fieldTypes.size()) { + throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match."); + } + List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size()); + for (int idx = 0; idx < fieldTypes.size(); ++idx) { + switch (fieldTypes.get(idx)) { + case Types.INTEGER: + fieldCoders.add(BigEndianIntegerCoder.of()); + break; + case Types.SMALLINT: + fieldCoders.add(ShortCoder.of()); + break; + case Types.TINYINT: + fieldCoders.add(ByteCoder.of()); + break; + case Types.DOUBLE: + fieldCoders.add(DoubleCoder.of()); + break; + case Types.FLOAT: + fieldCoders.add(FloatCoder.of()); + break; + case Types.DECIMAL: + fieldCoders.add(BigDecimalCoder.of()); + break; + case Types.BIGINT: + fieldCoders.add(BigEndianLongCoder.of()); + break; + case Types.VARCHAR: + case Types.CHAR: + fieldCoders.add(StringUtf8Coder.of()); + break; + case Types.TIME: + fieldCoders.add(TimeCoder.of()); + break; + case Types.DATE: + case Types.TIMESTAMP: + fieldCoders.add(DateCoder.of()); + break; + case Types.BOOLEAN: + fieldCoders.add(BooleanCoder.of()); + break; + + default: + throw new UnsupportedOperationException( + "Data type: " + fieldTypes.get(idx) + " not supported yet!"); + } + } + return new BeamRecordSqlType(fieldNames, fieldTypes, fieldCoders); + } + + @Override + public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException { + if (null == fieldValue) {// no need to do type check for NULL value + return; + } + + int fieldType = fieldTypes.get(index); + Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType); + if (javaClazz == null) { + throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!"); + } + + if (!fieldValue.getClass().equals(javaClazz)) { + throw new IllegalArgumentException( + String.format("[%s](%s) doesn't match type [%s]", + fieldValue, fieldValue.getClass(), fieldType) + ); + } + } + + public List<Integer> getFieldTypes() { + return fieldTypes; + } + + public Integer getFieldTypeByIndex(int index){ + return fieldTypes.get(index); + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof BeamRecordSqlType) { + BeamRecordSqlType ins = (BeamRecordSqlType) obj; + return fieldTypes.equals(ins.getFieldTypes()) && getFieldNames().equals(ins.getFieldNames()); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * getFieldNames().hashCode() + getFieldTypes().hashCode(); + } + + @Override + public String toString() { + return "BeamRecordSqlType [fieldNames=" + getFieldNames() + + ", fieldTypes=" + fieldTypes + "]"; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java index bf6a9c0..34355fb 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java @@ -17,13 +17,11 @@ */ package org.apache.beam.sdk.extensions.sql; -import com.google.auto.value.AutoValue; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.BeamRecordCoder; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -94,10 +92,7 @@ public class BeamSql { * </ul> */ public static QueryTransform query(String sqlQuery) { - return QueryTransform.builder() - .setSqlEnv(new BeamSqlEnv()) - .setSqlQuery(sqlQuery) - .build(); + return new QueryTransform(sqlQuery); } /** @@ -109,10 +104,7 @@ public class BeamSql { * <p>Make sure to query it from a static table name <em>PCOLLECTION</em>. */ public static SimpleQueryTransform simpleQuery(String sqlQuery) { - return SimpleQueryTransform.builder() - .setSqlEnv(new BeamSqlEnv()) - .setSqlQuery(sqlQuery) - .build(); + return new SimpleQueryTransform(sqlQuery); } /** @@ -121,28 +113,22 @@ public class BeamSql { * <p>The table names in the input {@code PCollectionTuple} are only valid during the current * query. */ - @AutoValue - public abstract static class QueryTransform extends + public static class QueryTransform extends PTransform<PCollectionTuple, PCollection<BeamRecord>> { - abstract BeamSqlEnv getSqlEnv(); - abstract String getSqlQuery(); + private BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + private String sqlQuery; - static Builder builder() { - return new AutoValue_BeamSql_QueryTransform.Builder(); - } - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setSqlQuery(String sqlQuery); - abstract Builder setSqlEnv(BeamSqlEnv sqlEnv); - abstract QueryTransform build(); + public QueryTransform(String sqlQuery) { + this.sqlQuery = sqlQuery; } /** * register a UDF function used in this query. + * + * <p>Refer to {@link BeamSqlUdf} for more about how to implement a UDF in BeamSql. */ public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){ - getSqlEnv().registerUdf(functionName, clazz); + beamSqlEnv.registerUdf(functionName, clazz); return this; } /** @@ -150,7 +136,7 @@ public class BeamSql { * Note, {@link SerializableFunction} must have a constructor without arguments. */ public QueryTransform withUdf(String functionName, SerializableFunction sfn){ - getSqlEnv().registerUdf(functionName, sfn); + beamSqlEnv.registerUdf(functionName, sfn); return this; } @@ -158,7 +144,7 @@ public class BeamSql { * register a {@link CombineFn} as UDAF function used in this query. */ public QueryTransform withUdaf(String functionName, CombineFn combineFn){ - getSqlEnv().registerUdaf(functionName, combineFn); + beamSqlEnv.registerUdaf(functionName, combineFn); return this; } @@ -168,13 +154,13 @@ public class BeamSql { BeamRelNode beamRelNode = null; try { - beamRelNode = getSqlEnv().planner.convertToBeamRel(getSqlQuery()); + beamRelNode = beamSqlEnv.getPlanner().convertToBeamRel(sqlQuery); } catch (ValidationException | RelConversionException | SqlParseException e) { throw new IllegalStateException(e); } try { - return beamRelNode.buildBeamPipeline(input, getSqlEnv()); + return beamRelNode.buildBeamPipeline(input, beamSqlEnv); } catch (Exception e) { throw new IllegalStateException(e); } @@ -186,7 +172,7 @@ public class BeamSql { PCollection<BeamRecord> sourceStream = (PCollection<BeamRecord>) input.get(sourceTag); BeamRecordCoder sourceCoder = (BeamRecordCoder) sourceStream.getCoder(); - getSqlEnv().registerTable(sourceTag.getId(), + beamSqlEnv.registerTable(sourceTag.getId(), new BeamPCollectionTable(sourceStream, (BeamRecordSqlType) sourceCoder.getRecordType())); } @@ -197,53 +183,47 @@ public class BeamSql { * A {@link PTransform} representing an execution plan for a SQL query referencing * a single table. */ - @AutoValue - public abstract static class SimpleQueryTransform + public static class SimpleQueryTransform extends PTransform<PCollection<BeamRecord>, PCollection<BeamRecord>> { private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION"; - abstract BeamSqlEnv getSqlEnv(); - abstract String getSqlQuery(); + private QueryTransform delegate; - static Builder builder() { - return new AutoValue_BeamSql_SimpleQueryTransform.Builder(); + public SimpleQueryTransform(String sqlQuery) { + this.delegate = new QueryTransform(sqlQuery); } - @AutoValue.Builder - abstract static class Builder { - abstract Builder setSqlQuery(String sqlQuery); - abstract Builder setSqlEnv(BeamSqlEnv sqlEnv); - abstract SimpleQueryTransform build(); + /** + * register a UDF function used in this query. + * + * <p>Refer to {@link BeamSqlUdf} for more about how to implement a UDAF in BeamSql. + */ + public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){ + delegate.withUdf(functionName, clazz); + return this; } /** - * register a UDF function used in this query. + * register {@link SerializableFunction} as a UDF function used in this query. + * Note, {@link SerializableFunction} must have a constructor without arguments. */ - public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){ - getSqlEnv().registerUdf(functionName, clazz); - return this; - } - /** - * register {@link SerializableFunction} as a UDF function used in this query. - * Note, {@link SerializableFunction} must have a constructor without arguments. - */ - public SimpleQueryTransform withUdf(String functionName, SerializableFunction sfn){ - getSqlEnv().registerUdf(functionName, sfn); - return this; - } + public SimpleQueryTransform withUdf(String functionName, SerializableFunction sfn){ + delegate.withUdf(functionName, sfn); + return this; + } - /** - * register a {@link CombineFn} as UDAF function used in this query. - */ - public SimpleQueryTransform withUdaf(String functionName, CombineFn combineFn){ - getSqlEnv().registerUdaf(functionName, combineFn); - return this; - } + /** + * register a {@link CombineFn} as UDAF function used in this query. + */ + public SimpleQueryTransform withUdaf(String functionName, CombineFn combineFn){ + delegate.withUdaf(functionName, combineFn); + return this; + } private void validateQuery() { SqlNode sqlNode; try { - sqlNode = getSqlEnv().planner.parseQuery(getSqlQuery()); - getSqlEnv().planner.getPlanner().close(); + sqlNode = delegate.beamSqlEnv.getPlanner().parseQuery(delegate.sqlQuery); + delegate.beamSqlEnv.getPlanner().getPlanner().close(); } catch (SqlParseException e) { throw new IllegalStateException(e); } @@ -264,10 +244,7 @@ public class BeamSql { public PCollection<BeamRecord> expand(PCollection<BeamRecord> input) { validateQuery(); return PCollectionTuple.of(new TupleTag<BeamRecord>(PCOLLECTION_TABLE_NAME), input) - .apply(QueryTransform.builder() - .setSqlEnv(getSqlEnv()) - .setSqlQuery(getSqlQuery()) - .build()); + .apply(delegate); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java deleted file mode 100644 index a43808e..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.plan.RelOptUtil; - -/** - * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. - */ -@Experimental -public class BeamSqlCli { - /** - * Returns a human readable representation of the query execution plan. - */ - public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception { - BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString); - String beamPlan = RelOptUtil.toString(exeTree); - return beamPlan; - } - - /** - * compile SQL, and return a {@link Pipeline}. - */ - public static PCollection<BeamRecord> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv) - throws Exception{ - PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() - .as(PipelineOptions.class); // FlinkPipelineOptions.class - options.setJobName("BeamPlanCreator"); - Pipeline pipeline = Pipeline.create(options); - - return compilePipeline(sqlStatement, pipeline, sqlEnv); - } - - /** - * compile SQL, and return a {@link Pipeline}. - */ - public static PCollection<BeamRecord> compilePipeline(String sqlStatement, Pipeline basePipeline, - BeamSqlEnv sqlEnv) throws Exception{ - PCollection<BeamRecord> resultStream = - sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv); - return resultStream; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java deleted file mode 100644 index 79f2b32..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql; - -import java.io.Serializable; -import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl; -import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.calcite.DataContext; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.ScannableTable; -import org.apache.calcite.schema.Schema; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Statistic; -import org.apache.calcite.schema.Statistics; -import org.apache.calcite.schema.impl.ScalarFunctionImpl; -import org.apache.calcite.tools.Frameworks; - -/** - * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}. - * - * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and - * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries. - */ -public class BeamSqlEnv implements Serializable{ - transient SchemaPlus schema; - transient BeamQueryPlanner planner; - - public BeamSqlEnv() { - schema = Frameworks.createRootSchema(true); - planner = new BeamQueryPlanner(schema); - } - - /** - * Register a UDF function which can be used in SQL expression. - */ - public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) { - schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD)); - } - - /** - * register {@link SerializableFunction} as a UDF function which can be used in SQL expression. - * Note, {@link SerializableFunction} must have a constructor without arguments. - */ - public void registerUdf(String functionName, SerializableFunction sfn) { - schema.add(functionName, ScalarFunctionImpl.create(sfn.getClass(), "apply")); - } - - /** - * Register a {@link CombineFn} as UDAF function which can be used in GROUP-BY expression. - */ - public void registerUdaf(String functionName, CombineFn combineFn) { - schema.add(functionName, new UdafImpl(combineFn)); - } - - /** - * Registers a {@link BaseBeamTable} which can be used for all subsequent queries. - * - */ - public void registerTable(String tableName, BaseBeamTable table) { - schema.add(tableName, new BeamCalciteTable(table.getRowType())); - planner.getSourceTables().put(tableName, table); - } - - /** - * Find {@link BaseBeamTable} by table name. - */ - public BaseBeamTable findTable(String tableName){ - return planner.getSourceTables().get(tableName); - } - - private static class BeamCalciteTable implements ScannableTable, Serializable { - private BeamRecordSqlType beamSqlRowType; - public BeamCalciteTable(BeamRecordSqlType beamSqlRowType) { - this.beamSqlRowType = beamSqlRowType; - } - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return CalciteUtils.toCalciteRowType(this.beamSqlRowType) - .apply(BeamQueryPlanner.TYPE_FACTORY); - } - - @Override - public Enumerable<Object[]> scan(DataContext root) { - // not used as Beam SQL uses its own execution engine - return null; - } - - /** - * Not used {@link Statistic} to optimize the plan. - */ - @Override - public Statistic getStatistic() { - return Statistics.UNKNOWN; - } - - /** - * all sources are treated as TABLE in Beam SQL. - */ - @Override - public Schema.TableType getJdbcTableType() { - return Schema.TableType.TABLE; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java new file mode 100644 index 0000000..870165d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.util.Date; +import java.util.GregorianCalendar; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.values.BeamRecord; + +/** + * A {@link Coder} encodes {@link BeamRecord}. + */ +@Experimental +public class BeamSqlRecordHelper { + + public static BeamRecordSqlType getSqlRecordType(BeamRecord record) { + return (BeamRecordSqlType) record.getDataType(); + } + + /** + * {@link Coder} for Java type {@link Short}. + */ + public static class ShortCoder extends CustomCoder<Short> { + private static final ShortCoder INSTANCE = new ShortCoder(); + + public static ShortCoder of() { + return INSTANCE; + } + + private ShortCoder() { + } + + @Override + public void encode(Short value, OutputStream outStream) throws CoderException, IOException { + new DataOutputStream(outStream).writeShort(value); + } + + @Override + public Short decode(InputStream inStream) throws CoderException, IOException { + return new DataInputStream(inStream).readShort(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } + /** + * {@link Coder} for Java type {@link Float}, it's stored as {@link BigDecimal}. + */ + public static class FloatCoder extends CustomCoder<Float> { + private static final FloatCoder INSTANCE = new FloatCoder(); + private static final BigDecimalCoder CODER = BigDecimalCoder.of(); + + public static FloatCoder of() { + return INSTANCE; + } + + private FloatCoder() { + } + + @Override + public void encode(Float value, OutputStream outStream) throws CoderException, IOException { + CODER.encode(new BigDecimal(value), outStream); + } + + @Override + public Float decode(InputStream inStream) throws CoderException, IOException { + return CODER.decode(inStream).floatValue(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } + /** + * {@link Coder} for Java type {@link Double}, it's stored as {@link BigDecimal}. + */ + public static class DoubleCoder extends CustomCoder<Double> { + private static final DoubleCoder INSTANCE = new DoubleCoder(); + private static final BigDecimalCoder CODER = BigDecimalCoder.of(); + + public static DoubleCoder of() { + return INSTANCE; + } + + private DoubleCoder() { + } + + @Override + public void encode(Double value, OutputStream outStream) throws CoderException, IOException { + CODER.encode(new BigDecimal(value), outStream); + } + + @Override + public Double decode(InputStream inStream) throws CoderException, IOException { + return CODER.decode(inStream).doubleValue(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } + + /** + * {@link Coder} for Java type {@link GregorianCalendar}, it's stored as {@link Long}. + */ + public static class TimeCoder extends CustomCoder<GregorianCalendar> { + private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); + private static final TimeCoder INSTANCE = new TimeCoder(); + + public static TimeCoder of() { + return INSTANCE; + } + + private TimeCoder() { + } + + @Override + public void encode(GregorianCalendar value, OutputStream outStream) + throws CoderException, IOException { + longCoder.encode(value.getTime().getTime(), outStream); + } + + @Override + public GregorianCalendar decode(InputStream inStream) throws CoderException, IOException { + GregorianCalendar calendar = new GregorianCalendar(); + calendar.setTime(new Date(longCoder.decode(inStream))); + return calendar; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } + /** + * {@link Coder} for Java type {@link Date}, it's stored as {@link Long}. + */ + public static class DateCoder extends CustomCoder<Date> { + private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); + private static final DateCoder INSTANCE = new DateCoder(); + + public static DateCoder of() { + return INSTANCE; + } + + private DateCoder() { + } + + @Override + public void encode(Date value, OutputStream outStream) throws CoderException, IOException { + longCoder.encode(value.getTime(), outStream); + } + + @Override + public Date decode(InputStream inStream) throws CoderException, IOException { + return new Date(longCoder.decode(inStream)); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } + + /** + * {@link Coder} for Java type {@link Boolean}. + */ + public static class BooleanCoder extends CustomCoder<Boolean> { + private static final BooleanCoder INSTANCE = new BooleanCoder(); + + public static BooleanCoder of() { + return INSTANCE; + } + + private BooleanCoder() { + } + + @Override + public void encode(Boolean value, OutputStream outStream) throws CoderException, IOException { + new DataOutputStream(outStream).writeBoolean(value); + } + + @Override + public Boolean decode(InputStream inStream) throws CoderException, IOException { + return new DataInputStream(inStream).readBoolean(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java new file mode 100644 index 0000000..d4828e7 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import java.io.Serializable; + +/** + * Interface to create a UDF in Beam SQL. + * + * <p>A static method {@code eval} is required. Here is an example: + * + * <blockquote><pre> + * public static class MyLeftFunction { + * public String eval( + * @Parameter(name = "s") String s, + * @Parameter(name = "n", optional = true) Integer n) { + * return s.substring(0, n == null ? 1 : n); + * } + * }</pre></blockquote> + * + * <p>The first parameter is named "s" and is mandatory, + * and the second parameter is named "n" and is optional. + */ +public interface BeamSqlUdf extends Serializable { + String UDF_METHOD = "eval"; +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java index 91251cf..0c5dae1 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java @@ -21,8 +21,8 @@ import java.sql.Types; import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.BeamSql; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java new file mode 100644 index 0000000..5c7d920 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptUtil; + +/** + * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. + */ +@Experimental +public class BeamSqlCli { + /** + * Returns a human readable representation of the query execution plan. + */ + public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception { + BeamRelNode exeTree = sqlEnv.getPlanner().convertToBeamRel(sqlString); + String beamPlan = RelOptUtil.toString(exeTree); + return beamPlan; + } + + /** + * compile SQL, and return a {@link Pipeline}. + */ + public static PCollection<BeamRecord> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv) + throws Exception{ + PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() + .as(PipelineOptions.class); + options.setJobName("BeamPlanCreator"); + Pipeline pipeline = Pipeline.create(options); + + return compilePipeline(sqlStatement, pipeline, sqlEnv); + } + + /** + * compile SQL, and return a {@link Pipeline}. + */ + public static PCollection<BeamRecord> compilePipeline(String sqlStatement, Pipeline basePipeline, + BeamSqlEnv sqlEnv) throws Exception{ + PCollection<BeamRecord> resultStream = sqlEnv.getPlanner() + .compileBeamPipeline(sqlStatement, basePipeline, sqlEnv); + return resultStream; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java new file mode 100644 index 0000000..fcc9079 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.BeamSql; +import org.apache.beam.sdk.extensions.sql.BeamSqlUdf; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; +import org.apache.calcite.tools.Frameworks; + +/** + * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and + * {@link BeamSqlCli}. + * + * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, + * and a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries. + */ +public class BeamSqlEnv implements Serializable{ + transient SchemaPlus schema; + transient BeamQueryPlanner planner; + + public BeamSqlEnv() { + schema = Frameworks.createRootSchema(true); + planner = new BeamQueryPlanner(schema); + } + + /** + * Register a UDF function which can be used in SQL expression. + */ + public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) { + schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD)); + } + + /** + * Register {@link SerializableFunction} as a UDF function which can be used in SQL expression. + * Note, {@link SerializableFunction} must have a constructor without arguments. + */ + public void registerUdf(String functionName, SerializableFunction sfn) { + schema.add(functionName, ScalarFunctionImpl.create(sfn.getClass(), "apply")); + } + + /** + * Register a UDAF function which can be used in GROUP-BY expression. + * See {@link org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF. + */ + public void registerUdaf(String functionName, Combine.CombineFn combineFn) { + schema.add(functionName, new UdafImpl(combineFn)); + } + + /** + * Registers a {@link BaseBeamTable} which can be used for all subsequent queries. + * + */ + public void registerTable(String tableName, BeamSqlTable table) { + schema.add(tableName, new BeamCalciteTable(table.getRowType())); + planner.getSourceTables().put(tableName, table); + } + + /** + * Find {@link BaseBeamTable} by table name. + */ + public BeamSqlTable findTable(String tableName){ + return planner.getSourceTables().get(tableName); + } + + private static class BeamCalciteTable implements ScannableTable, Serializable { + private BeamRecordSqlType beamSqlRowType; + public BeamCalciteTable(BeamRecordSqlType beamSqlRowType) { + this.beamSqlRowType = beamSqlRowType; + } + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return CalciteUtils.toCalciteRowType(this.beamSqlRowType) + .apply(BeamQueryPlanner.TYPE_FACTORY); + } + + @Override + public Enumerable<Object[]> scan(DataContext root) { + // not used as Beam SQL uses its own execution engine + return null; + } + + /** + * Not used {@link Statistic} to optimize the plan. + */ + @Override + public Statistic getStatistic() { + return Statistics.UNKNOWN; + } + + /** + * all sources are treated as TABLE in Beam SQL. + */ + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + } + + public BeamQueryPlanner getPlanner() { + return planner; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java new file mode 100644 index 0000000..de237d6 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation classes of BeamSql. + */ +package org.apache.beam.sdk.extensions.sql.impl; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java index b421bc3..410c783 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java @@ -23,10 +23,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -67,7 +68,7 @@ public class BeamQueryPlanner { private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class); protected final Planner planner; - private Map<String, BaseBeamTable> sourceTables = new HashMap<>(); + private Map<String, BeamSqlTable> sourceTables = new HashMap<>(); public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT); @@ -156,7 +157,7 @@ public class BeamQueryPlanner { return planner.validate(sqlNode); } - public Map<String, BaseBeamTable> getSourceTables() { + public Map<String, BeamSqlTable> getSourceTables() { return sourceTables; } http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java index 4b557f9..e49e79c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java @@ -21,10 +21,10 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.coders.BeamRecordCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java index 8fe5be4..9d36a47 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.impl.rel; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlFilterFn; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java index 1e3eb4c..7bb08c2 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java @@ -19,8 +19,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import com.google.common.base.Joiner; import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -65,7 +65,7 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode { String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - BaseBeamTable targetTable = sqlEnv.findTable(sourceName); + BeamSqlTable targetTable = sqlEnv.findTable(sourceName); upstream.apply(stageName, targetTable.buildIOWriter()); http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java index 254f990..1e4f506 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java @@ -18,9 +18,9 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import com.google.common.base.Joiner; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -53,7 +53,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { return sourceStream; } else { //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). - BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); + BeamSqlTable sourceTable = sqlEnv.findTable(sourceName); return sourceTable.buildIOReader(inputPCollections.getPipeline()) .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder()); } http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java index 5919329..1ffb636 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index 5ac9575..cc26aa6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -25,10 +25,10 @@ import java.util.Map; import java.util.Set; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java index b55252a..6f5dff2 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java index b1ff629..501feb3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlProjectFn; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java index b8b4293..9e8d46d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.impl.rel; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -33,6 +33,7 @@ public interface BeamRelNode extends RelNode { * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search) * algorithm. */ - PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) + PCollection<BeamRecord> buildBeamPipeline( + PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) throws Exception; } http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java index f9cbf4f..a1f3e2b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.io.Serializable; import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java index 80f3c97..d658638 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java @@ -25,9 +25,9 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.ParDo; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java index 63ebdf3..85d676e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java index c4caff3..d684294 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java @@ -21,10 +21,10 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java new file mode 100644 index 0000000..73e0863 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.schema; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; + +/** + * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. + */ +public abstract class BaseBeamTable implements BeamSqlTable, Serializable { + protected BeamRecordSqlType beamSqlRowType; + public BaseBeamTable(BeamRecordSqlType beamSqlRowType) { + this.beamSqlRowType = beamSqlRowType; + } + + @Override public BeamRecordSqlType getRowType() { + return beamSqlRowType; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java new file mode 100644 index 0000000..5ced467 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.schema; + +import java.io.Serializable; + +/** + * Type as a source IO, determined whether it's a STREAMING process, or batch + * process. + */ +public enum BeamIOType implements Serializable { + BOUNDED, UNBOUNDED; +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java new file mode 100644 index 0000000..31e60e0 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.schema; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PDone; + +/** + * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table, + * then a downstream query can query directly. + */ +public class BeamPCollectionTable extends BaseBeamTable { + private BeamIOType ioType; + private transient PCollection<BeamRecord> upstream; + + protected BeamPCollectionTable(BeamRecordSqlType beamSqlRowType) { + super(beamSqlRowType); + } + + public BeamPCollectionTable(PCollection<BeamRecord> upstream, + BeamRecordSqlType beamSqlRowType){ + this(beamSqlRowType); + ioType = upstream.isBounded().equals(IsBounded.BOUNDED) + ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED; + this.upstream = upstream; + } + + @Override + public BeamIOType getSourceType() { + return ioType; + } + + @Override + public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { + return upstream; + } + + @Override + public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { + throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target"); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java new file mode 100644 index 0000000..46fba59 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.schema; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** + * This interface defines a Beam Sql Table. + */ +public interface BeamSqlTable { + /** + * In Beam SQL, there's no difference between a batch query and a streaming + * query. {@link BeamIOType} is used to validate the sources. + */ + BeamIOType getSourceType(); + + /** + * create a {@code PCollection<BeamSqlRow>} from source. + * + */ + PCollection<BeamRecord> buildIOReader(Pipeline pipeline); + + /** + * create a {@code IO.write()} instance to write to target. + * + */ + PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter(); + + /** + * Get the schema info of the table. + */ + BeamRecordSqlType getRowType(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java new file mode 100644 index 0000000..6f7f09b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.schema; + +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.NlsString; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.csv.CSVRecord; + +/** + * Utility methods for working with {@code BeamTable}. + */ +public final class BeamTableUtils { + public static BeamRecord csvLine2BeamSqlRow( + CSVFormat csvFormat, + String line, + BeamRecordSqlType beamRecordSqlType) { + List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.getFieldCount()); + try (StringReader reader = new StringReader(line)) { + CSVParser parser = csvFormat.parse(reader); + CSVRecord rawRecord = parser.getRecords().get(0); + + if (rawRecord.size() != beamRecordSqlType.getFieldCount()) { + throw new IllegalArgumentException(String.format( + "Expect %d fields, but actually %d", + beamRecordSqlType.getFieldCount(), rawRecord.size() + )); + } else { + for (int idx = 0; idx < beamRecordSqlType.getFieldCount(); idx++) { + String raw = rawRecord.get(idx); + fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw)); + } + } + } catch (IOException e) { + throw new IllegalArgumentException("decodeRecord failed!", e); + } + return new BeamRecord(beamRecordSqlType, fieldsValue); + } + + public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) { + StringWriter writer = new StringWriter(); + try (CSVPrinter printer = csvFormat.print(writer)) { + for (int i = 0; i < row.getFieldCount(); i++) { + printer.print(row.getFieldValue(i).toString()); + } + printer.println(); + } catch (IOException e) { + throw new IllegalArgumentException("encodeRecord failed!", e); + } + return writer.toString(); + } + + public static Object autoCastField(int fieldType, Object rawObj) { + if (rawObj == null) { + return null; + } + + SqlTypeName columnType = CalciteUtils.toCalciteType(fieldType); + // auto-casting for numberics + if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType)) + || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) { + String raw = rawObj.toString(); + switch (columnType) { + case TINYINT: + return Byte.valueOf(raw); + case SMALLINT: + return Short.valueOf(raw); + case INTEGER: + return Integer.valueOf(raw); + case BIGINT: + return Long.valueOf(raw); + case FLOAT: + return Float.valueOf(raw); + case DOUBLE: + return Double.valueOf(raw); + default: + throw new UnsupportedOperationException( + String.format("Column type %s is not supported yet!", columnType)); + } + } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) { + // convert NlsString to String + if (rawObj instanceof NlsString) { + return ((NlsString) rawObj).getValue(); + } else { + return rawObj; + } + } else { + return rawObj; + } + } +}
