Repository: beam Updated Branches: refs/heads/DSL_SQL 3625dbd9e -> f1c2b6540
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java new file mode 100644 index 0000000..3816063 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.schema; + +import java.io.Serializable; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema.TableType; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.beam.dsls.sql.planner.BeamQueryPlanner; + +/** + * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. + */ +public abstract class BaseBeamTable implements ScannableTable, Serializable { + + /** + * + */ + private static final long serialVersionUID = -1262988061830914193L; + private RelDataType relDataType; + + protected BeamSQLRecordType beamSqlRecordType; + + public BaseBeamTable(RelProtoDataType protoRowType) { + this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY); + this.beamSqlRecordType = BeamSQLRecordType.from(relDataType); + } + + /** + * In Beam SQL, there's no difference between a batch query and a streaming + * query. {@link BeamIOType} is used to validate the sources. + */ + public abstract BeamIOType getSourceType(); + + /** + * create a {@code IO.read()} instance to read from source. + * + */ + public abstract PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader(); + + /** + * create a {@code IO.write()} instance to write to target. + * + */ + public abstract PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter(); + + @Override + public Enumerable<Object[]> scan(DataContext root) { + // not used as Beam SQL uses its own execution engine + return null; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return relDataType; + } + + /** + * Not used {@link Statistic} to optimize the plan. + */ + @Override + public Statistic getStatistic() { + return Statistics.UNKNOWN; + } + + /** + * all sources are treated as TABLE in Beam SQL. + */ + @Override + public TableType getJdbcTableType() { + return TableType.TABLE; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java new file mode 100644 index 0000000..5e55b0f --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.schema; + +import java.io.Serializable; + +/** + * Type as a source IO, determined whether it's a STREAMING process, or batch + * process. + */ +public enum BeamIOType implements Serializable { + BOUNDED, UNBOUNDED; +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java new file mode 100644 index 0000000..dc8e381 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.schema; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Field type information in {@link BeamSQLRow}. + * + */ +//@DefaultCoder(BeamSQLRecordTypeCoder.class) +public class BeamSQLRecordType implements Serializable { + /** + * + */ + private static final long serialVersionUID = -5318734648766104712L; + private List<String> fieldsName = new ArrayList<>(); + private List<SqlTypeName> fieldsType = new ArrayList<>(); + + public static BeamSQLRecordType from(RelDataType tableInfo) { + BeamSQLRecordType record = new BeamSQLRecordType(); + for (RelDataTypeField f : tableInfo.getFieldList()) { + record.fieldsName.add(f.getName()); + record.fieldsType.add(f.getType().getSqlTypeName()); + } + return record; + } + + public int size() { + return fieldsName.size(); + } + + public List<String> getFieldsName() { + return fieldsName; + } + + public void setFieldsName(List<String> fieldsName) { + this.fieldsName = fieldsName; + } + + public List<SqlTypeName> getFieldsType() { + return fieldsType; + } + + public void setFieldsType(List<SqlTypeName> fieldsType) { + this.fieldsType = fieldsType; + } + + @Override + public String toString() { + return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java new file mode 100644 index 0000000..2989cb9 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.schema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * A {@link Coder} for {@link BeamSQLRecordType}. + * + */ +public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> { + private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + private static final VarIntCoder intCoder = VarIntCoder.of(); + + private static final BeamSQLRecordTypeCoder INSTANCE = new BeamSQLRecordTypeCoder(); + private BeamSQLRecordTypeCoder(){} + + public static BeamSQLRecordTypeCoder of() { + return INSTANCE; + } + + @Override + public void encode(BeamSQLRecordType value, OutputStream outStream, + org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { + Context nested = context.nested(); + intCoder.encode(value.size(), outStream, nested); + for(String fieldName : value.getFieldsName()){ + stringCoder.encode(fieldName, outStream, nested); + } + for(SqlTypeName fieldType : value.getFieldsType()){ + stringCoder.encode(fieldType.name(), outStream, nested); + } + outStream.flush(); + } + + @Override + public BeamSQLRecordType decode(InputStream inStream, + org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { + BeamSQLRecordType typeRecord = new BeamSQLRecordType(); + Context nested = context.nested(); + int size = intCoder.decode(inStream, nested); + for(int idx=0; idx<size; ++idx){ + typeRecord.getFieldsName().add(stringCoder.decode(inStream, nested)); + } + for(int idx=0; idx<size; ++idx){ + typeRecord.getFieldsType().add(SqlTypeName.valueOf(stringCoder.decode(inStream, nested))); + } + return typeRecord; + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void verifyDeterministic() + throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { + // TODO Auto-generated method stub + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java new file mode 100644 index 0000000..db93168 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.schema; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Repersent a generic ROW record in Beam SQL. + * + */ +public class BeamSQLRow implements Serializable { + /** + * + */ + private static final long serialVersionUID = 4569220242480160895L; + + private List<Integer> nullFields = new ArrayList<>(); + private List<Object> dataValues; + private BeamSQLRecordType dataType; + + public BeamSQLRow(BeamSQLRecordType dataType) { + this.dataType = dataType; + this.dataValues = new ArrayList<>(); + for(int idx=0; idx<dataType.size(); ++idx){ + dataValues.add(null); + } + } + + public BeamSQLRow(BeamSQLRecordType dataType, List<Object> dataValues) { + this.dataValues = dataValues; + this.dataType = dataType; + } + + public void addField(String fieldName, Object fieldValue) { + addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); + } + + public void addField(int index, Object fieldValue) { + if(fieldValue == null){ + dataValues.set(index, fieldValue); + if(!nullFields.contains(index)){nullFields.add(index);} + return; + } + + SqlTypeName fieldType = dataType.getFieldsType().get(index); + switch (fieldType) { + case INTEGER: + case SMALLINT: + case TINYINT: + if(!(fieldValue instanceof Integer)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case DOUBLE: + if(!(fieldValue instanceof Double)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case BIGINT: + if(!(fieldValue instanceof Long)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case FLOAT: + if(!(fieldValue instanceof Float)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case VARCHAR: + if(!(fieldValue instanceof String)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case TIME: + case TIMESTAMP: + if(!(fieldValue instanceof Date)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + default: + throw new UnsupportedDataTypeException(fieldType); + } + dataValues.set(index, fieldValue); + } + + + public int getInteger(int idx) { + return (Integer) getFieldValue(idx); + } + + public double getDouble(int idx) { + return (Double) getFieldValue(idx); + } + + public long getLong(int idx) { + return (Long) getFieldValue(idx); + } + + public String getString(int idx) { + return (String) getFieldValue(idx); + } + + public Date getDate(int idx) { + return (Date) getFieldValue(idx); + } + + public Object getFieldValue(String fieldName) { + return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); + } + + public Object getFieldValue(int fieldIdx) { + if(nullFields.contains(fieldIdx)){ + return null; + } + + Object fieldValue = dataValues.get(fieldIdx); + SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx); + + switch (fieldType) { + case INTEGER: + case SMALLINT: + case TINYINT: + if(!(fieldValue instanceof Integer)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + }else{ + return Integer.valueOf(fieldValue.toString()); + } + case DOUBLE: + if(!(fieldValue instanceof Double)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + }else{ + return Double.valueOf(fieldValue.toString()); + } + case BIGINT: + if(!(fieldValue instanceof Long)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + }else{ + return Long.valueOf(fieldValue.toString()); + } + case FLOAT: + if(!(fieldValue instanceof Float)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + }else{ + return Float.valueOf(fieldValue.toString()); + } + case VARCHAR: + if(!(fieldValue instanceof String)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + }else{ + return fieldValue.toString(); + } + case TIME: + case TIMESTAMP: + if(!(fieldValue instanceof Date)){ + throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + }else{ + return fieldValue; + } + default: + throw new UnsupportedDataTypeException(fieldType); + } + } + + public int size() { + return dataValues.size(); + } + + public List<Object> getDataValues() { + return dataValues; + } + + public void setDataValues(List<Object> dataValues) { + this.dataValues = dataValues; + } + + public BeamSQLRecordType getDataType() { + return dataType; + } + + public void setDataType(BeamSQLRecordType dataType) { + this.dataType = dataType; + } + + public void setNullFields(List<Integer> nullFields) { + this.nullFields = nullFields; + } + + public List<Integer> getNullFields() { + return nullFields; + } + + @Override + public String toString() { + return "BeamSQLRow [dataValues=" + dataValues + ", dataType=" + dataType + "]"; + } + + /** + * Return data fields as key=value. + */ + public String valueInString() { + StringBuffer sb = new StringBuffer(); + for (int idx = 0; idx < size(); ++idx) { + sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx))); + } + return sb.substring(1); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + BeamSQLRow other = (BeamSQLRow) obj; + return toString().equals(other.toString()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java new file mode 100644 index 0000000..00af18d --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.schema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Date; +import java.util.List; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; + +/** + * A {@link Coder} encodes {@link BeamSQLRow}. + * + */ +public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ + private static final BeamSQLRecordTypeCoder recordTypeCoder = BeamSQLRecordTypeCoder.of(); + + private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of()); + + private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of(); + private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); + private static final DoubleCoder doubleCoder = DoubleCoder.of(); + + private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder(); + private BeamSqlRowCoder(){} + + public static BeamSqlRowCoder of() { + return INSTANCE; + } + + @Override + public void encode(BeamSQLRow value, OutputStream outStream, + org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { + recordTypeCoder.encode(value.getDataType(), outStream, context); + listCoder.encode(value.getNullFields(), outStream, context); + + Context nested = context.nested(); + + for (int idx = 0; idx < value.size(); ++idx) { + if(value.getNullFields().contains(idx)){ + continue; + } + + switch (value.getDataType().getFieldsType().get(idx)) { + case INTEGER: + case SMALLINT: + case TINYINT: + intCoder.encode(value.getInteger(idx), outStream, nested); + break; + case DOUBLE: + case FLOAT: + doubleCoder.encode(value.getDouble(idx), outStream, nested); + break; + case BIGINT: + longCoder.encode(value.getLong(idx), outStream, nested); + break; + case VARCHAR: + stringCoder.encode(value.getString(idx), outStream, nested); + break; + case TIME: + case TIMESTAMP: + longCoder.encode(value.getDate(idx).getTime(), outStream, nested); + break; + + default: + throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx)); + } + } + } + + @Override + public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + throws CoderException, IOException { + BeamSQLRecordType type = recordTypeCoder.decode(inStream, context); + List<Integer> nullFields = listCoder.decode(inStream, context); + + BeamSQLRow record = new BeamSQLRow(type); + record.setNullFields(nullFields); + + for (int idx = 0; idx < type.size(); ++idx) { + if(nullFields.contains(idx)){ + continue; + } + + switch (type.getFieldsType().get(idx)) { + case INTEGER: + case SMALLINT: + case TINYINT: + record.addField(idx, intCoder.decode(inStream, context)); + break; + case DOUBLE: + case FLOAT: + record.addField(idx, doubleCoder.decode(inStream, context)); + break; + case BIGINT: + record.addField(idx, longCoder.decode(inStream, context)); + break; + case VARCHAR: + record.addField(idx, stringCoder.decode(inStream, context)); + break; + case TIME: + case TIMESTAMP: + record.addField(idx, new Date(longCoder.decode(inStream, context))); + break; + + default: + throw new UnsupportedDataTypeException(type.getFieldsType().get(idx)); + } + } + + return record; + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return null; + } + + @Override + public void verifyDeterministic() + throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java new file mode 100644 index 0000000..6240426 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.schema; + +public class InvalidFieldException extends RuntimeException { + + public InvalidFieldException() { + super(); + } + + public InvalidFieldException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java new file mode 100644 index 0000000..9a2235e --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.schema; + +import org.apache.calcite.sql.type.SqlTypeName; + +public class UnsupportedDataTypeException extends RuntimeException { + + public UnsupportedDataTypeException(SqlTypeName unsupportedType){ + super(String.format("Not support data type [%s]", unsupportedType)); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java new file mode 100644 index 0000000..2570763 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.schema.kafka; + +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.beam.dsls.sql.schema.BeamSQLRecordType; +import org.beam.dsls.sql.schema.BeamSQLRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Kafka topic that saves records as CSV format. + * + */ +public class BeamKafkaCSVTable extends BeamKafkaTable { + + /** + * + */ + private static final long serialVersionUID = 4754022536543333984L; + + public static final String DELIMITER = ","; + private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaCSVTable.class); + + public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers, + List<String> topics) { + super(protoRowType, bootstrapServers, topics); + } + + @Override + public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> + getPTransformForInput() { + return new CsvRecorderDecoder(beamSqlRecordType); + } + + @Override + public PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> + getPTransformForOutput() { + return new CsvRecorderEncoder(beamSqlRecordType); + } + + /** + * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSQLRow}. + * + */ + public static class CsvRecorderDecoder + extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> { + private BeamSQLRecordType recordType; + + public CsvRecorderDecoder(BeamSQLRecordType recordType) { + this.recordType = recordType; + } + + @Override + public PCollection<BeamSQLRow> expand(PCollection<KV<byte[], byte[]>> input) { + return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSQLRow>() { + @ProcessElement + public void processElement(ProcessContext c) { + String rowInString = new String(c.element().getValue()); + String[] parts = rowInString.split(BeamKafkaCSVTable.DELIMITER); + if (parts.length != recordType.size()) { + LOG.error(String.format("invalid record: ", rowInString)); + } else { + BeamSQLRow sourceRecord = new BeamSQLRow(recordType); + for (int idx = 0; idx < parts.length; ++idx) { + sourceRecord.addField(idx, parts[idx]); + } + c.output(sourceRecord); + } + } + })); + } + } + + /** + * A PTransform to convert {@link BeamSQLRow} to {@code KV<byte[], byte[]>}. + * + */ + public static class CsvRecorderEncoder + extends PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> { + private BeamSQLRecordType recordType; + + public CsvRecorderEncoder(BeamSQLRecordType recordType) { + this.recordType = recordType; + } + + @Override + public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSQLRow> input) { + return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSQLRow, KV<byte[], byte[]>>() { + @ProcessElement + public void processElement(ProcessContext c) { + BeamSQLRow in = c.element(); + StringBuffer sb = new StringBuffer(); + for (int idx = 0; idx < in.size(); ++idx) { + sb.append(DELIMITER); + sb.append(in.getFieldValue(idx).toString()); + } + c.output(KV.of(new byte[] {}, sb.substring(1).getBytes())); + } + })); + + } + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java new file mode 100644 index 0000000..482383b --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.schema.kafka; + +import static com.google.common.base.Preconditions.checkArgument; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.beam.dsls.sql.schema.BaseBeamTable; +import org.beam.dsls.sql.schema.BeamIOType; +import org.beam.dsls.sql.schema.BeamSQLRow; + +/** + * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to + * extend to convert between {@code BeamSQLRow} and {@code KV<byte[], byte[]>}. + * + */ +public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -634715473399906527L; + + private String bootstrapServers; + private List<String> topics; + private Map<String, Object> configUpdates; + + protected BeamKafkaTable(RelProtoDataType protoRowType) { + super(protoRowType); + } + + public BeamKafkaTable(RelProtoDataType protoRowType, String bootstrapServers, + List<String> topics) { + super(protoRowType); + this.bootstrapServers = bootstrapServers; + this.topics = topics; + } + + public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) { + this.configUpdates = configUpdates; + return this; + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.UNBOUNDED; + } + + public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> + getPTransformForInput(); + + public abstract PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> + getPTransformForOutput(); + + @Override + public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() { + return new PTransform<PBegin, PCollection<BeamSQLRow>>() { + + @Override + public PCollection<BeamSQLRow> expand(PBegin input) { + return input.apply("read", + KafkaIO.<byte[], byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics) + .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of()) + .withValueCoder(ByteArrayCoder.of()).withoutMetadata()) + .apply("in_format", getPTransformForInput()); + + } + }; + } + + @Override + public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() { + checkArgument(topics != null && topics.size() == 1, + "Only one topic can be acceptable as output."); + + return new PTransform<PCollection<BeamSQLRow>, PDone>() { + @Override + public PDone expand(PCollection<BeamSQLRow> input) { + return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", + KafkaIO.<byte[], byte[]>write().withBootstrapServers(bootstrapServers) + .withTopic(topics.get(0)).withKeyCoder(ByteArrayCoder.of()) + .withValueCoder(ByteArrayCoder.of())); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java new file mode 100644 index 0000000..822fce7 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * table schema for KafkaIO. + */ +package org.beam.dsls.sql.schema.kafka; http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java new file mode 100644 index 0000000..ef9cc7d --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * define table schema, to map with Beam IO components. + * + */ +package org.beam.dsls.sql.schema; http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java new file mode 100644 index 0000000..06db280 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.transform; + +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; +import org.beam.dsls.sql.rel.BeamFilterRel; +import org.beam.dsls.sql.schema.BeamSQLRow; + +/** + * {@code BeamSQLFilterFn} is the executor for a {@link BeamFilterRel} step. + * + */ +public class BeamSQLFilterFn extends DoFn<BeamSQLRow, BeamSQLRow> { + /** + * + */ + private static final long serialVersionUID = -1256111753670606705L; + + private String stepName; + private BeamSQLExpressionExecutor executor; + + public BeamSQLFilterFn(String stepName, BeamSQLExpressionExecutor executor) { + super(); + this.stepName = stepName; + this.executor = executor; + } + + @Setup + public void setup() { + executor.prepare(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + BeamSQLRow in = c.element(); + + List<Object> result = executor.execute(in); + + if ((Boolean) result.get(0)) { + c.output(in); + } + } + + @Teardown + public void close() { + executor.close(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java new file mode 100644 index 0000000..1014c0d --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.transform; + +import org.apache.beam.sdk.transforms.DoFn; +import org.beam.dsls.sql.schema.BeamSQLRow; + +/** + * A test PTransform to display output in console. + * + */ +public class BeamSQLOutputToConsoleFn extends DoFn<BeamSQLRow, Void> { + /** + * + */ + private static final long serialVersionUID = -1256111753670606705L; + + private String stepName; + + public BeamSQLOutputToConsoleFn(String stepName) { + super(); + this.stepName = stepName; + } + + @ProcessElement + public void processElement(ProcessContext c) { + System.out.println("Output: " + c.element().getDataValues()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java new file mode 100644 index 0000000..12061d2 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.transform; + +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; +import org.beam.dsls.sql.rel.BeamProjectRel; +import org.beam.dsls.sql.schema.BeamSQLRecordType; +import org.beam.dsls.sql.schema.BeamSQLRow; + +/** + * + * {@code BeamSQLProjectFn} is the executor for a {@link BeamProjectRel} step. + * + */ +public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> { + + /** + * + */ + private static final long serialVersionUID = -1046605249999014608L; + private String stepName; + private BeamSQLExpressionExecutor executor; + private BeamSQLRecordType outputRecordType; + + public BeamSQLProjectFn(String stepName, BeamSQLExpressionExecutor executor, + BeamSQLRecordType outputRecordType) { + super(); + this.stepName = stepName; + this.executor = executor; + this.outputRecordType = outputRecordType; + } + + @Setup + public void setup() { + executor.prepare(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + List<Object> results = executor.execute(c.element()); + + BeamSQLRow outRow = new BeamSQLRow(outputRecordType); + for (int idx = 0; idx < results.size(); ++idx) { + outRow.addField(idx, results.get(idx)); + } + + c.output(outRow); + } + + @Teardown + public void close() { + executor.close(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java new file mode 100644 index 0000000..2607abf --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSQL pipeline. + */ +package org.beam.dsls.sql.transform; http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/resources/log4j.properties b/dsls/sql/src/main/resources/log4j.properties new file mode 100644 index 0000000..709484b --- /dev/null +++ b/dsls/sql/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=ERROR,console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java new file mode 100644 index 0000000..56e45c4 --- /dev/null +++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.beam.dsls.sql.schema.BaseBeamTable; +import org.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable; +import org.junit.BeforeClass; + +/** + * prepare {@code BeamSqlRunner} for test. + * + */ +public class BasePlanner { + public static BeamSqlRunner runner = new BeamSqlRunner(); + + @BeforeClass + public static void prepare() { + runner.addTable("ORDER_DETAILS", getTable()); + runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); + runner.addTable("SUB_ORDER_RAM", getTable()); + } + + private static BaseBeamTable getTable() { + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); + } + }; + + return new MockedBeamSQLTable(protoRowType); + } + + public static BaseBeamTable getTable(String bootstrapServer, String topic) { + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); + } + }; + + Map<String, Object> consumerPara = new HashMap<String, Object>(); + consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + + return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic)) + .updateConsumerProperties(consumerPara); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java new file mode 100644 index 0000000..a77878f --- /dev/null +++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests to explain queries. + * + */ +public class BeamPlannerExplainTest extends BasePlanner { + + @Test + public void selectAll() throws Exception { + String sql = "SELECT * FROM ORDER_DETAILS"; + String plan = runner.explainQuery(sql); + + String expectedPlan = + "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n" + + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; + Assert.assertEquals("explain doesn't match", expectedPlan, plan); + } + + @Test + public void selectWithFilter() throws Exception { + String sql = "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " + + "WHERE SITE_ID = 0 and price > 20"; + String plan = runner.explainQuery(sql); + + String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n" + + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n" + + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; + Assert.assertEquals("explain doesn't match", expectedPlan, plan); + } + + @Test + public void insertSelectFilter() throws Exception { + String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT " + + " order_id, site_id, price " + "FROM ORDER_DETAILS " + + "WHERE SITE_ID = 0 and price > 20"; + String plan = runner.explainQuery(sql); + + String expectedPlan = + "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n" + + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[null])\n" + + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n" + + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n" + + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; + Assert.assertEquals("explain doesn't match", expectedPlan, plan); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java new file mode 100644 index 0000000..eb097a9 --- /dev/null +++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +import org.apache.beam.sdk.Pipeline; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests to execute a query. + * + */ +public class BeamPlannerSubmitTest extends BasePlanner { + @Test + public void insertSelectFilter() throws Exception { + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; + Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + runner.getPlanner().planner.close(); + + pipeline.run().waitUntilFinish(); + + Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1); + Assert.assertEquals("order_id=12345,site_id=0,price=20.5,order_time=null", MockedBeamSQLTable.CONTENT.get(0)); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java new file mode 100644 index 0000000..31f5578 --- /dev/null +++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.beam.dsls.sql.schema.BaseBeamTable; +import org.beam.dsls.sql.schema.BeamIOType; +import org.beam.dsls.sql.schema.BeamSQLRow; + +/** + * A mock table use to check input/output. + * + */ +public class MockedBeamSQLTable extends BaseBeamTable { + + /** + * + */ + private static final long serialVersionUID = 1373168368414036932L; + + public static final List<String> CONTENT = new ArrayList<>(); + + public MockedBeamSQLTable(RelProtoDataType protoRowType) { + super(protoRowType); + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.UNBOUNDED; + } + + @Override + public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() { + BeamSQLRow row1 = new BeamSQLRow(beamSqlRecordType); + row1.addField(0, 12345L); + row1.addField(1, 0); + row1.addField(2, 10.5); + row1.addField(3, new Date()); + + BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType); + row2.addField(0, 12345L); + row2.addField(1, 1); + row2.addField(2, 20.5); + row2.addField(3, new Date()); + + BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType); + row3.addField(0, 12345L); + row3.addField(1, 0); + row3.addField(2, 20.5); + row3.addField(3, new Date()); + + BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType); + row4.addField(0, null); + row4.addField(1, null); + row4.addField(2, 20.5); + row4.addField(3, new Date()); + + return Create.of(row1, row2, row3); + } + + @Override + public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() { + return new OutputStore(); + } + + /** + * Keep output in {@code CONTENT} for validation. + * + */ + public static class OutputStore extends PTransform<PCollection<BeamSQLRow>, PDone> { + + @Override + public PDone expand(PCollection<BeamSQLRow> input) { + input.apply(ParDo.of(new DoFn<BeamSQLRow, Void>() { + + @Setup + public void setup() { + CONTENT.clear(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + CONTENT.add(c.element().valueInString()); + } + + @Teardown + public void close() { + + } + + })); + return PDone.in(input.getPipeline()); + } + + } + +}
