http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java new file mode 100644 index 0000000..502e8c1 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.Serializable; + +/** + * Type as a source IO, determined whether it's a STREAMING process, or batch + * process. + */ +public enum BeamIOType implements Serializable { + BOUNDED, UNBOUNDED; +}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java new file mode 100644 index 0000000..661b155 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Field type information in {@link BeamSQLRow}. + * + */ +//@DefaultCoder(BeamSQLRecordTypeCoder.class) +public class BeamSQLRecordType implements Serializable { + /** + * + */ + private static final long serialVersionUID = -5318734648766104712L; + private List<String> fieldsName = new ArrayList<>(); + private List<SqlTypeName> fieldsType = new ArrayList<>(); + + public static BeamSQLRecordType from(RelDataType tableInfo) { + BeamSQLRecordType record = new BeamSQLRecordType(); + for (RelDataTypeField f : tableInfo.getFieldList()) { + record.fieldsName.add(f.getName()); + record.fieldsType.add(f.getType().getSqlTypeName()); + } + return record; + } + + public int size() { + return fieldsName.size(); + } + + public List<String> getFieldsName() { + return fieldsName; + } + + public void setFieldsName(List<String> fieldsName) { + this.fieldsName = fieldsName; + } + + public List<SqlTypeName> getFieldsType() { + return fieldsType; + } + + public void setFieldsType(List<SqlTypeName> fieldsType) { + this.fieldsType = fieldsType; + } + + @Override + public String toString() { + return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java new file mode 100644 index 0000000..ec330f1 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * A {@link Coder} for {@link BeamSQLRecordType}. + * + */ +public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> { + private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + private static final VarIntCoder intCoder = VarIntCoder.of(); + + private static final BeamSQLRecordTypeCoder INSTANCE = new BeamSQLRecordTypeCoder(); + private BeamSQLRecordTypeCoder(){} + + public static BeamSQLRecordTypeCoder of() { + return INSTANCE; + } + + @Override + public void encode(BeamSQLRecordType value, OutputStream outStream, + org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { + Context nested = context.nested(); + intCoder.encode(value.size(), outStream, nested); + for (String fieldName : value.getFieldsName()) { + stringCoder.encode(fieldName, outStream, nested); + } + for (SqlTypeName fieldType : value.getFieldsType()) { + stringCoder.encode(fieldType.name(), outStream, nested); + } + outStream.flush(); + } + + @Override + public BeamSQLRecordType decode(InputStream inStream, + org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { + BeamSQLRecordType typeRecord = new BeamSQLRecordType(); + Context nested = context.nested(); + int size = intCoder.decode(inStream, nested); + for (int idx = 0; idx < size; ++idx) { + typeRecord.getFieldsName().add(stringCoder.decode(inStream, nested)); + } + for (int idx = 0; idx < size; ++idx) { + typeRecord.getFieldsType().add(SqlTypeName.valueOf(stringCoder.decode(inStream, nested))); + } + return typeRecord; + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void verifyDeterministic() + throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { + // TODO Auto-generated method stub + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java new file mode 100644 index 0000000..b65e23b --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Repersent a generic ROW record in Beam SQL. + * + */ +public class BeamSQLRow implements Serializable { + /** + * + */ + private static final long serialVersionUID = 4569220242480160895L; + + private List<Integer> nullFields = new ArrayList<>(); + private List<Object> dataValues; + private BeamSQLRecordType dataType; + + public BeamSQLRow(BeamSQLRecordType dataType) { + this.dataType = dataType; + this.dataValues = new ArrayList<>(); + for (int idx = 0; idx < dataType.size(); ++idx) { + dataValues.add(null); + } + } + + public BeamSQLRow(BeamSQLRecordType dataType, List<Object> dataValues) { + this.dataValues = dataValues; + this.dataType = dataType; + } + + public void addField(String fieldName, Object fieldValue) { + addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); + } + + public void addField(int index, Object fieldValue) { + if (fieldValue == null) { + dataValues.set(index, fieldValue); + if (!nullFields.contains(index)) { + nullFields.add(index); + } + return; + } + + SqlTypeName fieldType = dataType.getFieldsType().get(index); + switch (fieldType) { + case INTEGER: + case SMALLINT: + case TINYINT: + if (!(fieldValue instanceof Integer)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case DOUBLE: + if (!(fieldValue instanceof Double)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case BIGINT: + if (!(fieldValue instanceof Long)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case FLOAT: + if (!(fieldValue instanceof Float)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case VARCHAR: + if (!(fieldValue instanceof String)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case TIME: + case TIMESTAMP: + if (!(fieldValue instanceof Date)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + default: + throw new UnsupportedDataTypeException(fieldType); + } + dataValues.set(index, fieldValue); + } + + + public int getInteger(int idx) { + return (Integer) getFieldValue(idx); + } + + public double getDouble(int idx) { + return (Double) getFieldValue(idx); + } + + public long getLong(int idx) { + return (Long) getFieldValue(idx); + } + + public String getString(int idx) { + return (String) getFieldValue(idx); + } + + public Date getDate(int idx) { + return (Date) getFieldValue(idx); + } + + public Object getFieldValue(String fieldName) { + return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); + } + + public Object getFieldValue(int fieldIdx) { + if (nullFields.contains(fieldIdx)) { + return null; + } + + Object fieldValue = dataValues.get(fieldIdx); + SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx); + + switch (fieldType) { + case INTEGER: + case SMALLINT: + case TINYINT: + if (!(fieldValue instanceof Integer)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return Integer.valueOf(fieldValue.toString()); + } + case DOUBLE: + if (!(fieldValue instanceof Double)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return Double.valueOf(fieldValue.toString()); + } + case BIGINT: + if (!(fieldValue instanceof Long)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return Long.valueOf(fieldValue.toString()); + } + case FLOAT: + if (!(fieldValue instanceof Float)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return Float.valueOf(fieldValue.toString()); + } + case VARCHAR: + if (!(fieldValue instanceof String)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue.toString(); + } + case TIME: + case TIMESTAMP: + if (!(fieldValue instanceof Date)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + default: + throw new UnsupportedDataTypeException(fieldType); + } + } + + public int size() { + return dataValues.size(); + } + + public List<Object> getDataValues() { + return dataValues; + } + + public void setDataValues(List<Object> dataValues) { + this.dataValues = dataValues; + } + + public BeamSQLRecordType getDataType() { + return dataType; + } + + public void setDataType(BeamSQLRecordType dataType) { + this.dataType = dataType; + } + + public void setNullFields(List<Integer> nullFields) { + this.nullFields = nullFields; + } + + public List<Integer> getNullFields() { + return nullFields; + } + + @Override + public String toString() { + return "BeamSQLRow [dataValues=" + dataValues + ", dataType=" + dataType + "]"; + } + + /** + * Return data fields as key=value. + */ + public String valueInString() { + StringBuffer sb = new StringBuffer(); + for (int idx = 0; idx < size(); ++idx) { + sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx))); + } + return sb.substring(1); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + BeamSQLRow other = (BeamSQLRow) obj; + return toString().equals(other.toString()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java new file mode 100644 index 0000000..1979a00 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Date; +import java.util.List; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; + +/** + * A {@link Coder} encodes {@link BeamSQLRow}. + * + */ +public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ + private static final BeamSQLRecordTypeCoder recordTypeCoder = BeamSQLRecordTypeCoder.of(); + + private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of()); + + private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of(); + private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); + private static final DoubleCoder doubleCoder = DoubleCoder.of(); + + private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder(); + private BeamSqlRowCoder(){} + + public static BeamSqlRowCoder of() { + return INSTANCE; + } + + @Override + public void encode(BeamSQLRow value, OutputStream outStream, + org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { + recordTypeCoder.encode(value.getDataType(), outStream, context); + listCoder.encode(value.getNullFields(), outStream, context); + + Context nested = context.nested(); + + for (int idx = 0; idx < value.size(); ++idx) { + if (value.getNullFields().contains(idx)) { + continue; + } + + switch (value.getDataType().getFieldsType().get(idx)) { + case INTEGER: + case SMALLINT: + case TINYINT: + intCoder.encode(value.getInteger(idx), outStream, nested); + break; + case DOUBLE: + case FLOAT: + doubleCoder.encode(value.getDouble(idx), outStream, nested); + break; + case BIGINT: + longCoder.encode(value.getLong(idx), outStream, nested); + break; + case VARCHAR: + stringCoder.encode(value.getString(idx), outStream, nested); + break; + case TIME: + case TIMESTAMP: + longCoder.encode(value.getDate(idx).getTime(), outStream, nested); + break; + + default: + throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx)); + } + } + } + + @Override + public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + throws CoderException, IOException { + BeamSQLRecordType type = recordTypeCoder.decode(inStream, context); + List<Integer> nullFields = listCoder.decode(inStream, context); + + BeamSQLRow record = new BeamSQLRow(type); + record.setNullFields(nullFields); + + for (int idx = 0; idx < type.size(); ++idx) { + if (nullFields.contains(idx)) { + continue; + } + + switch (type.getFieldsType().get(idx)) { + case INTEGER: + case SMALLINT: + case TINYINT: + record.addField(idx, intCoder.decode(inStream, context)); + break; + case DOUBLE: + case FLOAT: + record.addField(idx, doubleCoder.decode(inStream, context)); + break; + case BIGINT: + record.addField(idx, longCoder.decode(inStream, context)); + break; + case VARCHAR: + record.addField(idx, stringCoder.decode(inStream, context)); + break; + case TIME: + case TIMESTAMP: + record.addField(idx, new Date(longCoder.decode(inStream, context))); + break; + + default: + throw new UnsupportedDataTypeException(type.getFieldsType().get(idx)); + } + } + + return record; + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return null; + } + + @Override + public void verifyDeterministic() + throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java new file mode 100644 index 0000000..c929a83 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +/** + * Exception when the field is invalid. + * + */ +public class InvalidFieldException extends RuntimeException { + + public InvalidFieldException() { + super(); + } + + public InvalidFieldException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/UnsupportedDataTypeException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/UnsupportedDataTypeException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/UnsupportedDataTypeException.java new file mode 100644 index 0000000..3a496c2 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/UnsupportedDataTypeException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Exception when {@link SqlTypeName} is not supported. + * + */ +public class UnsupportedDataTypeException extends RuntimeException { + + public UnsupportedDataTypeException(SqlTypeName unsupportedType){ + super(String.format("Not support data type [%s]", unsupportedType)); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java new file mode 100644 index 0000000..915a5cc --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema.kafka; + +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Kafka topic that saves records as CSV format. + * + */ +public class BeamKafkaCSVTable extends BeamKafkaTable { + + /** + * + */ + private static final long serialVersionUID = 4754022536543333984L; + + public static final String DELIMITER = ","; + private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaCSVTable.class); + + public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers, + List<String> topics) { + super(protoRowType, bootstrapServers, topics); + } + + @Override + public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> + getPTransformForInput() { + return new CsvRecorderDecoder(beamSqlRecordType); + } + + @Override + public PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> + getPTransformForOutput() { + return new CsvRecorderEncoder(beamSqlRecordType); + } + + /** + * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSQLRow}. + * + */ + public static class CsvRecorderDecoder + extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> { + private BeamSQLRecordType recordType; + + public CsvRecorderDecoder(BeamSQLRecordType recordType) { + this.recordType = recordType; + } + + @Override + public PCollection<BeamSQLRow> expand(PCollection<KV<byte[], byte[]>> input) { + return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSQLRow>() { + @ProcessElement + public void processElement(ProcessContext c) { + String rowInString = new String(c.element().getValue()); + String[] parts = rowInString.split(BeamKafkaCSVTable.DELIMITER); + if (parts.length != recordType.size()) { + LOG.error(String.format("invalid record: ", rowInString)); + } else { + BeamSQLRow sourceRecord = new BeamSQLRow(recordType); + for (int idx = 0; idx < parts.length; ++idx) { + sourceRecord.addField(idx, parts[idx]); + } + c.output(sourceRecord); + } + } + })); + } + } + + /** + * A PTransform to convert {@link BeamSQLRow} to {@code KV<byte[], byte[]>}. + * + */ + public static class CsvRecorderEncoder + extends PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> { + private BeamSQLRecordType recordType; + + public CsvRecorderEncoder(BeamSQLRecordType recordType) { + this.recordType = recordType; + } + + @Override + public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSQLRow> input) { + return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSQLRow, KV<byte[], byte[]>>() { + @ProcessElement + public void processElement(ProcessContext c) { + BeamSQLRow in = c.element(); + StringBuffer sb = new StringBuffer(); + for (int idx = 0; idx < in.size(); ++idx) { + sb.append(DELIMITER); + sb.append(in.getFieldValue(idx).toString()); + } + c.output(KV.of(new byte[] {}, sb.substring(1).getBytes())); + } + })); + + } + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java new file mode 100644 index 0000000..adf4621 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema.kafka; + +import static com.google.common.base.Preconditions.checkArgument; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamIOType; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.calcite.rel.type.RelProtoDataType; + +/** + * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to + * extend to convert between {@code BeamSQLRow} and {@code KV<byte[], byte[]>}. + * + */ +public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -634715473399906527L; + + private String bootstrapServers; + private List<String> topics; + private Map<String, Object> configUpdates; + + protected BeamKafkaTable(RelProtoDataType protoRowType) { + super(protoRowType); + } + + public BeamKafkaTable(RelProtoDataType protoRowType, String bootstrapServers, + List<String> topics) { + super(protoRowType); + this.bootstrapServers = bootstrapServers; + this.topics = topics; + } + + public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) { + this.configUpdates = configUpdates; + return this; + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.UNBOUNDED; + } + + public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> + getPTransformForInput(); + + public abstract PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> + getPTransformForOutput(); + + @Override + public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() { + return new PTransform<PBegin, PCollection<BeamSQLRow>>() { + + @Override + public PCollection<BeamSQLRow> expand(PBegin input) { + return input.apply("read", + KafkaIO.<byte[], byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics) + .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of()) + .withValueCoder(ByteArrayCoder.of()).withoutMetadata()) + .apply("in_format", getPTransformForInput()); + + } + }; + } + + @Override + public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() { + checkArgument(topics != null && topics.size() == 1, + "Only one topic can be acceptable as output."); + + return new PTransform<PCollection<BeamSQLRow>, PDone>() { + @Override + public PDone expand(PCollection<BeamSQLRow> input) { + return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", + KafkaIO.<byte[], byte[]>write().withBootstrapServers(bootstrapServers) + .withTopic(topics.get(0)).withKeyCoder(ByteArrayCoder.of()) + .withValueCoder(ByteArrayCoder.of())); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java new file mode 100644 index 0000000..0418372 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * table schema for KafkaIO. + */ +package org.apache.beam.dsls.sql.schema.kafka; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java new file mode 100644 index 0000000..47de06f --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * define table schema, to map with Beam IO components. + * + */ +package org.apache.beam.dsls.sql.schema; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java new file mode 100644 index 0000000..55086e2 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.transform; + +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; +import org.apache.beam.dsls.sql.rel.BeamFilterRel; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * {@code BeamSQLFilterFn} is the executor for a {@link BeamFilterRel} step. + * + */ +public class BeamSQLFilterFn extends DoFn<BeamSQLRow, BeamSQLRow> { + /** + * + */ + private static final long serialVersionUID = -1256111753670606705L; + + private String stepName; + private BeamSQLExpressionExecutor executor; + + public BeamSQLFilterFn(String stepName, BeamSQLExpressionExecutor executor) { + super(); + this.stepName = stepName; + this.executor = executor; + } + + @Setup + public void setup() { + executor.prepare(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + BeamSQLRow in = c.element(); + + List<Object> result = executor.execute(in); + + if ((Boolean) result.get(0)) { + c.output(in); + } + } + + @Teardown + public void close() { + executor.close(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java new file mode 100644 index 0000000..92ebff2 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.transform; + +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * A test PTransform to display output in console. + * + */ +public class BeamSQLOutputToConsoleFn extends DoFn<BeamSQLRow, Void> { + /** + * + */ + private static final long serialVersionUID = -1256111753670606705L; + + private String stepName; + + public BeamSQLOutputToConsoleFn(String stepName) { + super(); + this.stepName = stepName; + } + + @ProcessElement + public void processElement(ProcessContext c) { + System.out.println("Output: " + c.element().getDataValues()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java new file mode 100644 index 0000000..bafdd17 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.transform; + +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; +import org.apache.beam.dsls.sql.rel.BeamProjectRel; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * + * {@code BeamSQLProjectFn} is the executor for a {@link BeamProjectRel} step. + * + */ +public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> { + + /** + * + */ + private static final long serialVersionUID = -1046605249999014608L; + private String stepName; + private BeamSQLExpressionExecutor executor; + private BeamSQLRecordType outputRecordType; + + public BeamSQLProjectFn(String stepName, BeamSQLExpressionExecutor executor, + BeamSQLRecordType outputRecordType) { + super(); + this.stepName = stepName; + this.executor = executor; + this.outputRecordType = outputRecordType; + } + + @Setup + public void setup() { + executor.prepare(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + List<Object> results = executor.execute(c.element()); + + BeamSQLRow outRow = new BeamSQLRow(outputRecordType); + for (int idx = 0; idx < results.size(); ++idx) { + outRow.addField(idx, results.get(idx)); + } + + c.output(outRow); + } + + @Teardown + public void close() { + executor.close(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java new file mode 100644 index 0000000..cd2bdeb --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSQL pipeline. + */ +package org.apache.beam.dsls.sql.transform; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java deleted file mode 100644 index 7fb8def..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.example; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.beam.dsls.sql.planner.BeamSqlRunner; -import org.beam.dsls.sql.schema.BaseBeamTable; -import org.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable; - -/** - * This is one quick example. - * <p>Before start, follow https://kafka.apache.org/quickstart to setup a Kafka - * cluster locally, and run below commands to create required Kafka topics: - * <pre> - * <code> - * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \ - * --partitions 1 --topic orders - * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \ - * --partitions 1 --topic sub_orders - * </code> - * </pre> - * After run the application, produce several test records: - * <pre> - * <code> - * bin/kafka-console-producer.sh --broker-list localhost:9092 --topic orders - * invalid,record - * 123445,0,100,3413423 - * 234123,3,232,3451231234 - * 234234,0,5,1234123 - * 345234,0,345234.345,3423 - * </code> - * </pre> - * Meanwhile, open another console to see the output: - * <pre> - * <code> - * bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sub_orders - * **Expected : - * 123445,0,100.0 - * 345234,0,345234.345 - * </code> - * </pre> - */ -public class BeamSqlExample implements Serializable { - - /** - * - */ - private static final long serialVersionUID = 3673487843555563904L; - - public static void main(String[] args) throws Exception { - BeamSqlRunner runner = new BeamSqlRunner(); - runner.addTable("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders")); - runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); - - // case 2: insert into <table>(<fields>) select STREAM <fields> from - // <table> from <clause> - String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT " - + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - - runner.explainQuery(sql); - runner.submitQuery(sql); - } - - public static BaseBeamTable getTable(String bootstrapServer, String topic) { - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); - } - }; - - Map<String, Object> consumerPara = new HashMap<String, Object>(); - consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - - return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic)) - .updateConsumerProperties(consumerPara); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java deleted file mode 100644 index ae678e4..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * examples on how to use BeamSQL. - * - */ -package org.beam.dsls.sql.example; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java deleted file mode 100644 index e9d425d..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.interpreter; - -import java.io.Serializable; -import java.util.List; -import org.beam.dsls.sql.schema.BeamSQLRow; - -/** - * {@code BeamSQLExpressionExecutor} fills the gap between relational - * expressions in Calcite SQL and executable code. - * - */ -public interface BeamSQLExpressionExecutor extends Serializable { - - /** - * invoked before data processing. - */ - void prepare(); - - /** - * apply transformation to input record {@link BeamSQLRow}. - * - */ - List<Object> execute(BeamSQLRow inputRecord); - - void close(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java deleted file mode 100644 index 48306da..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.interpreter; - -import static com.google.common.base.Preconditions.checkArgument; -import java.util.ArrayList; -import java.util.List; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.beam.dsls.sql.planner.BeamSqlUnsupportedException; -import org.beam.dsls.sql.rel.BeamFilterRel; -import org.beam.dsls.sql.rel.BeamProjectRel; -import org.beam.dsls.sql.rel.BeamRelNode; -import org.beam.dsls.sql.schema.BeamSQLRow; -import org.springframework.expression.Expression; -import org.springframework.expression.ExpressionParser; -import org.springframework.expression.spel.SpelParserConfiguration; -import org.springframework.expression.spel.standard.SpelExpressionParser; -import org.springframework.expression.spel.support.StandardEvaluationContext; - -/** - * {@code BeamSQLSpELExecutor} is one implementation, to convert Calcite SQL - * relational expression to SpEL expression. - * - */ -public class BeamSQLSpELExecutor implements BeamSQLExpressionExecutor { - /** - * - */ - private static final long serialVersionUID = 6777232573390074408L; - - private List<String> spelString; - private List<Expression> spelExpressions; - - public BeamSQLSpELExecutor(BeamRelNode relNode) { - this.spelString = new ArrayList<>(); - if (relNode instanceof BeamFilterRel) { - String filterSpEL = CalciteToSpEL - .rexcall2SpEL((RexCall) ((BeamFilterRel) relNode).getCondition()); - spelString.add(filterSpEL); - } else if (relNode instanceof BeamProjectRel) { - spelString.addAll(createProjectExps((BeamProjectRel) relNode)); - // List<ProjectRule> projectRules = - // for (int idx = 0; idx < projectRules.size(); ++idx) { - // spelString.add(projectRules.get(idx).getProjectExp()); - // } - } else { - throw new BeamSqlUnsupportedException( - String.format("%s is not supported yet", relNode.getClass().toString())); - } - } - - @Override - public void prepare() { - this.spelExpressions = new ArrayList<>(); - - SpelParserConfiguration config = new SpelParserConfiguration(true, true); - ExpressionParser parser = new SpelExpressionParser(config); - for (String el : spelString) { - spelExpressions.add(parser.parseExpression(el)); - } - } - - @Override - public List<Object> execute(BeamSQLRow inputRecord) { - StandardEvaluationContext inContext = new StandardEvaluationContext(); - inContext.setVariable("in", inputRecord); - - List<Object> results = new ArrayList<>(); - for (Expression ep : spelExpressions) { - results.add(ep.getValue(inContext)); - } - return results; - } - - @Override - public void close() { - - } - - private List<String> createProjectExps(BeamProjectRel projectRel) { - List<String> rules = new ArrayList<>(); - - List<RexNode> exps = projectRel.getProjects(); - - for (int idx = 0; idx < exps.size(); ++idx) { - RexNode node = exps.get(idx); - if (node == null) { - rules.add("null"); - } - - if (node instanceof RexLiteral) { - rules.add(((RexLiteral) node).getValue() + ""); - } else { - if (node instanceof RexInputRef) { - rules.add("#in.getFieldValue(" + ((RexInputRef) node).getIndex() + ")"); - } - if (node instanceof RexCall) { - rules.add(CalciteToSpEL.rexcall2SpEL((RexCall) node)); - } - } - } - - checkArgument(rules.size() == exps.size(), "missing projects rules after conversion."); - - return rules; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java deleted file mode 100644 index c7cbace..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.interpreter; - -import com.google.common.base.Joiner; -import java.util.ArrayList; -import java.util.List; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexNode; -import org.beam.dsls.sql.planner.BeamSqlUnsupportedException; - -/** - * {@code CalciteToSpEL} is used in {@link BeamSQLSpELExecutor}, to convert a - * relational expression {@link RexCall} to SpEL expression. - * - */ -public class CalciteToSpEL { - - public static String rexcall2SpEL(RexCall cdn) { - List<String> parts = new ArrayList<>(); - for (RexNode subcdn : cdn.operands) { - if (subcdn instanceof RexCall) { - parts.add(rexcall2SpEL((RexCall) subcdn)); - } else { - parts.add(subcdn instanceof RexInputRef - ? "#in.getFieldValue(" + ((RexInputRef) subcdn).getIndex() + ")" : subcdn.toString()); - } - } - - String opName = cdn.op.getName(); - switch (cdn.op.getClass().getSimpleName()) { - case "SqlMonotonicBinaryOperator": // +-* - case "SqlBinaryOperator": // > < = >= <= <> OR AND || / . - switch (cdn.op.getName().toUpperCase()) { - case "AND": - return String.format(" ( %s ) ", Joiner.on("&&").join(parts)); - case "OR": - return String.format(" ( %s ) ", Joiner.on("||").join(parts)); - case "=": - return String.format(" ( %s ) ", Joiner.on("==").join(parts)); - case "<>": - return String.format(" ( %s ) ", Joiner.on("!=").join(parts)); - default: - return String.format(" ( %s ) ", Joiner.on(cdn.op.getName().toUpperCase()).join(parts)); - } - case "SqlCaseOperator": // CASE - return String.format(" (%s ? %s : %s)", parts.get(0), parts.get(1), parts.get(2)); - case "SqlCastFunction": // CAST - return parts.get(0); - case "SqlPostfixOperator": - switch (opName.toUpperCase()) { - case "IS NULL": - return String.format(" null == %s ", parts.get(0)); - case "IS NOT NULL": - return String.format(" null != %s ", parts.get(0)); - default: - throw new BeamSqlUnsupportedException(); - } - default: - throw new BeamSqlUnsupportedException(); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java deleted file mode 100644 index 85235e2..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * interpreter generate runnable 'code' to execute SQL relational expressions. - */ -package org.beam.dsls.sql.interpreter; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java deleted file mode 100644 index c6f5cf6..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * BeamSQL provides a new interface to run a SQL statement with Beam. - */ -package org.beam.dsls.sql; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java deleted file mode 100644 index 5a0c73d..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -import java.util.Map; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.values.PCollection; -import org.beam.dsls.sql.rel.BeamRelNode; -import org.beam.dsls.sql.schema.BaseBeamTable; -import org.beam.dsls.sql.schema.BeamSQLRecordType; -import org.beam.dsls.sql.schema.BeamSQLRecordTypeCoder; -import org.beam.dsls.sql.schema.BeamSQLRow; -import org.beam.dsls.sql.schema.BeamSqlRowCoder; - -/** - * {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam - * pipeline. - * - */ -public class BeamPipelineCreator { - private Map<String, BaseBeamTable> sourceTables; - private PCollection<BeamSQLRow> latestStream; - - private PipelineOptions options; - - private Pipeline pipeline; - - private boolean hasPersistent = false; - - public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables) { - this.sourceTables = sourceTables; - - options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() - .as(PipelineOptions.class); // FlinkPipelineOptions.class - options.setJobName("BeamPlanCreator"); - - pipeline = Pipeline.create(options); - CoderRegistry cr = pipeline.getCoderRegistry(); - cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of()); - cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of()); - } - - public PCollection<BeamSQLRow> getLatestStream() { - return latestStream; - } - - public void setLatestStream(PCollection<BeamSQLRow> latestStream) { - this.latestStream = latestStream; - } - - public Map<String, BaseBeamTable> getSourceTables() { - return sourceTables; - } - - public Pipeline getPipeline() { - return pipeline; - } - - public boolean isHasPersistent() { - return hasPersistent; - } - - public void setHasPersistent(boolean hasPersistent) { - this.hasPersistent = hasPersistent; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java deleted file mode 100644 index a31ace0..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.config.Lex; -import org.apache.calcite.jdbc.CalciteSchema; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.plan.Contexts; -import org.apache.calcite.plan.ConventionTraitDef; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.prepare.CalciteCatalogReader; -import org.apache.calcite.rel.RelCollationTraitDef; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlOperatorTable; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.parser.SqlParser; -import org.apache.calcite.tools.FrameworkConfig; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.Planner; -import org.apache.calcite.tools.RelConversionException; -import org.apache.calcite.tools.ValidationException; -import org.beam.dsls.sql.rel.BeamLogicalConvention; -import org.beam.dsls.sql.rel.BeamRelNode; -import org.beam.dsls.sql.schema.BaseBeamTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The core component to handle through a SQL statement, to submit a Beam - * pipeline. - * - */ -public class BeamQueryPlanner { - private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class); - - protected final Planner planner; - private Map<String, BaseBeamTable> sourceTables = new HashMap<>(); - - public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( - RelDataTypeSystem.DEFAULT); - - public BeamQueryPlanner(SchemaPlus schema) { - final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); - traitDefs.add(ConventionTraitDef.INSTANCE); - traitDefs.add(RelCollationTraitDef.INSTANCE); - - List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); - sqlOperatorTables.add(SqlStdOperatorTable.instance()); - sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false, - Collections.<String>emptyList(), TYPE_FACTORY)); - - FrameworkConfig config = Frameworks.newConfigBuilder() - .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema) - .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets()) - .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build(); - this.planner = Frameworks.getPlanner(config); - - for (String t : schema.getTableNames()) { - sourceTables.put(t, (BaseBeamTable) schema.getTable(t)); - } - } - - /** - * With a Beam pipeline generated in {@link #compileBeamPipeline(String)}, - * submit it to run and wait until finish. - * - */ - public void submitToRun(String sqlStatement) throws Exception { - Pipeline pipeline = compileBeamPipeline(sqlStatement); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); - } - - /** - * With the @{@link BeamRelNode} tree generated in - * {@link #convertToBeamRel(String)}, a Beam pipeline is generated. - * - */ - public Pipeline compileBeamPipeline(String sqlStatement) throws Exception { - BeamRelNode relNode = convertToBeamRel(sqlStatement); - - BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables); - return relNode.buildBeamPipeline(planCreator); - } - - /** - * It parses and validate the input query, then convert into a - * {@link BeamRelNode} tree. - * - */ - public BeamRelNode convertToBeamRel(String sqlStatement) - throws ValidationException, RelConversionException, SqlParseException { - return (BeamRelNode) validateAndConvert(planner.parse(sqlStatement)); - } - - private RelNode validateAndConvert(SqlNode sqlNode) - throws ValidationException, RelConversionException { - SqlNode validated = validateNode(sqlNode); - LOG.info("SQL:\n" + validated); - RelNode relNode = convertToRelNode(validated); - return convertToBeamRel(relNode); - } - - private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException { - RelTraitSet traitSet = relNode.getTraitSet(); - - LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode)); - - // PlannerImpl.transform() optimizes RelNode with ruleset - return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode); - } - - private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException { - return planner.rel(sqlNode).rel; - } - - private SqlNode validateNode(SqlNode sqlNode) throws ValidationException { - SqlNode validatedSqlNode = planner.validate(sqlNode); - validatedSqlNode.accept(new UnsupportedOperatorsVisitor()); - return validatedSqlNode; - } - - public Map<String, BaseBeamTable> getSourceTables() { - return sourceTables; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java deleted file mode 100644 index bf35296..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.rel.type.RelDataTypeSystemImpl; - -/** - * customized data type in Beam. - * - */ -public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl { - public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem(); - - @Override - public int getMaxNumericScale() { - return 38; - } - - @Override - public int getMaxNumericPrecision() { - return 38; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java deleted file mode 100644 index 3f40c27..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import java.util.Iterator; -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.tools.RuleSet; -import org.beam.dsls.sql.rel.BeamRelNode; -import org.beam.dsls.sql.rule.BeamFilterRule; -import org.beam.dsls.sql.rule.BeamIOSinkRule; -import org.beam.dsls.sql.rule.BeamIOSourceRule; -import org.beam.dsls.sql.rule.BeamProjectRule; - -/** - * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard - * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode} - * - */ -public class BeamRuleSets { - private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet - .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, - BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE) - .build(); - - public static RuleSet[] getRuleSets() { - return new RuleSet[] { new BeamRuleSet( - ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) }; - } - - private static class BeamRuleSet implements RuleSet { - final ImmutableSet<RelOptRule> rules; - - public BeamRuleSet(ImmutableSet<RelOptRule> rules) { - this.rules = rules; - } - - public BeamRuleSet(ImmutableList<RelOptRule> rules) { - this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build(); - } - - @Override - public Iterator<RelOptRule> iterator() { - return rules.iterator(); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java deleted file mode 100644 index 94b341c..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.plan.volcano.RelSubset; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.sql.SqlExplainLevel; -import org.beam.dsls.sql.rel.BeamRelNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utilities for {@code BeamRelNode}. - */ -public class BeamSQLRelUtils { - private static final Logger LOG = LoggerFactory.getLogger(BeamSQLRelUtils.class); - - private static final AtomicInteger sequence = new AtomicInteger(0); - private static final AtomicInteger classSequence = new AtomicInteger(0); - - public static String getStageName(BeamRelNode relNode) { - return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" - + sequence.getAndIncrement(); - } - - public static String getClassName(BeamRelNode relNode) { - return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() - + "_" + classSequence.getAndIncrement(); - } - - public static BeamRelNode getBeamRelInput(RelNode input) { - if (input instanceof RelSubset) { - // go with known best input - input = ((RelSubset) input).getBest(); - } - return (BeamRelNode) input; - } - - public static String explain(final RelNode rel) { - return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES); - } - - public static String explain(final RelNode rel, SqlExplainLevel detailLevel) { - String explain = ""; - try { - explain = RelOptUtil.toString(rel); - } catch (StackOverflowError e) { - LOG.error("StackOverflowError occurred while extracting plan. " - + "Please report it to the dev@ mailing list."); - LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e); - LOG.error("Forcing plan to empty string and continue... " - + "SQL Runner may not working properly after."); - } - return explain; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java deleted file mode 100644 index 9581fcd..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -import java.io.Serializable; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.schema.Schema; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.RelConversionException; -import org.apache.calcite.tools.ValidationException; -import org.beam.dsls.sql.rel.BeamRelNode; -import org.beam.dsls.sql.schema.BaseBeamTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Interface to explain, submit a SQL query. - * - */ -public class BeamSqlRunner implements Serializable { - /** - * - */ - private static final long serialVersionUID = -4708693435115005182L; - - private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRunner.class); - - private SchemaPlus schema = Frameworks.createRootSchema(true); - - private BeamQueryPlanner planner = new BeamQueryPlanner(schema); - - /** - * Add a schema. - * - */ - public void addSchema(String schemaName, Schema scheme) { - schema.add(schemaName, schema); - } - - /** - * add a {@link BaseBeamTable} to schema repository. - * - */ - public void addTable(String tableName, BaseBeamTable table) { - schema.add(tableName, table); - planner.getSourceTables().put(tableName, table); - } - - /** - * submit as a Beam pipeline. - * - */ - public void submitQuery(String sqlString) throws Exception { - planner.submitToRun(sqlString); - planner.planner.close(); - } - - /** - * explain and display the execution plan. - * - */ - public String explainQuery(String sqlString) - throws ValidationException, RelConversionException, SqlParseException { - BeamRelNode exeTree = planner.convertToBeamRel(sqlString); - String beamPlan = RelOptUtil.toString(exeTree); - System.out.println(String.format("beamPlan>\n%s", beamPlan)); - - planner.planner.close(); - return beamPlan; - } - - protected BeamQueryPlanner getPlanner() { - return planner; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java deleted file mode 100644 index a3475bb..0000000 --- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -/** - * Generic exception for un-supported operations. - * - */ -public class BeamSqlUnsupportedException extends RuntimeException { - /** - * - */ - private static final long serialVersionUID = 3445015747629217342L; - - public BeamSqlUnsupportedException(String string) { - super(string); - } - - public BeamSqlUnsupportedException() { - super(); - } - -}
