http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java new file mode 100644 index 0000000..b910c84 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.util.Date; +import java.util.GregorianCalendar; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.values.BeamRecord; + +/** + * A {@link Coder} encodes {@link BeamRecord}. + */ +@Experimental +public class BeamSqlRecordHelper { + + public static BeamSqlRecordType getSqlRecordType(BeamRecord record) { + return (BeamSqlRecordType) record.getDataType(); + } + + /** + * {@link Coder} for Java type {@link Short}. + */ + public static class ShortCoder extends CustomCoder<Short> { + private static final ShortCoder INSTANCE = new ShortCoder(); + + public static ShortCoder of() { + return INSTANCE; + } + + private ShortCoder() { + } + + @Override + public void encode(Short value, OutputStream outStream) throws CoderException, IOException { + new DataOutputStream(outStream).writeShort(value); + } + + @Override + public Short decode(InputStream inStream) throws CoderException, IOException { + return new DataInputStream(inStream).readShort(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } + /** + * {@link Coder} for Java type {@link Float}, it's stored as {@link BigDecimal}. + */ + public static class FloatCoder extends CustomCoder<Float> { + private static final FloatCoder INSTANCE = new FloatCoder(); + private static final BigDecimalCoder CODER = BigDecimalCoder.of(); + + public static FloatCoder of() { + return INSTANCE; + } + + private FloatCoder() { + } + + @Override + public void encode(Float value, OutputStream outStream) throws CoderException, IOException { + CODER.encode(new BigDecimal(value), outStream); + } + + @Override + public Float decode(InputStream inStream) throws CoderException, IOException { + return CODER.decode(inStream).floatValue(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } + /** + * {@link Coder} for Java type {@link Double}, it's stored as {@link BigDecimal}. + */ + public static class DoubleCoder extends CustomCoder<Double> { + private static final DoubleCoder INSTANCE = new DoubleCoder(); + private static final BigDecimalCoder CODER = BigDecimalCoder.of(); + + public static DoubleCoder of() { + return INSTANCE; + } + + private DoubleCoder() { + } + + @Override + public void encode(Double value, OutputStream outStream) throws CoderException, IOException { + CODER.encode(new BigDecimal(value), outStream); + } + + @Override + public Double decode(InputStream inStream) throws CoderException, IOException { + return CODER.decode(inStream).doubleValue(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } + + /** + * {@link Coder} for Java type {@link GregorianCalendar}, it's stored as {@link Long}. + */ + public static class TimeCoder extends CustomCoder<GregorianCalendar> { + private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); + private static final TimeCoder INSTANCE = new TimeCoder(); + + public static TimeCoder of() { + return INSTANCE; + } + + private TimeCoder() { + } + + @Override + public void encode(GregorianCalendar value, OutputStream outStream) + throws CoderException, IOException { + longCoder.encode(value.getTime().getTime(), outStream); + } + + @Override + public GregorianCalendar decode(InputStream inStream) throws CoderException, IOException { + GregorianCalendar calendar = new GregorianCalendar(); + calendar.setTime(new Date(longCoder.decode(inStream))); + return calendar; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } + /** + * {@link Coder} for Java type {@link Date}, it's stored as {@link Long}. + */ + public static class DateCoder extends CustomCoder<Date> { + private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); + private static final DateCoder INSTANCE = new DateCoder(); + + public static DateCoder of() { + return INSTANCE; + } + + private DateCoder() { + } + + @Override + public void encode(Date value, OutputStream outStream) throws CoderException, IOException { + longCoder.encode(value.getTime(), outStream); + } + + @Override + public Date decode(InputStream inStream) throws CoderException, IOException { + return new Date(longCoder.decode(inStream)); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } + + /** + * {@link Coder} for Java type {@link Boolean}. + */ + public static class BooleanCoder extends CustomCoder<Boolean> { + private static final BooleanCoder INSTANCE = new BooleanCoder(); + + public static BooleanCoder of() { + return INSTANCE; + } + + private BooleanCoder() { + } + + @Override + public void encode(Boolean value, OutputStream outStream) throws CoderException, IOException { + new DataOutputStream(outStream).writeBoolean(value); + } + + @Override + public Boolean decode(InputStream inStream) throws CoderException, IOException { + return new DataInputStream(inStream).readBoolean(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + } + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java new file mode 100644 index 0000000..b295049 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema; + +import java.math.BigDecimal; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.BeamRecordType; + +/** + * Type provider for {@link BeamRecord} with SQL types. + * + * <p>Limited SQL types are supported now, visit + * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a> + * for more details. + * + */ +public class BeamSqlRecordType extends BeamRecordType { + private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); + static { + SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); + } + + public List<Integer> fieldsType; + + protected BeamSqlRecordType(List<String> fieldsName, List<Coder> fieldsCoder) { + super(fieldsName, fieldsCoder); + } + + private BeamSqlRecordType(List<String> fieldsName, List<Integer> fieldsType + , List<Coder> fieldsCoder) { + super(fieldsName, fieldsCoder); + this.fieldsType = fieldsType; + } + + public static BeamSqlRecordType create(List<String> fieldNames, + List<Integer> fieldTypes) { + List<Coder> fieldCoders = new ArrayList<>(); + for (int idx = 0; idx < fieldTypes.size(); ++idx) { + switch (fieldTypes.get(idx)) { + case Types.INTEGER: + fieldCoders.add(BigEndianIntegerCoder.of()); + break; + case Types.SMALLINT: + fieldCoders.add(ShortCoder.of()); + break; + case Types.TINYINT: + fieldCoders.add(ByteCoder.of()); + break; + case Types.DOUBLE: + fieldCoders.add(DoubleCoder.of()); + break; + case Types.FLOAT: + fieldCoders.add(FloatCoder.of()); + break; + case Types.DECIMAL: + fieldCoders.add(BigDecimalCoder.of()); + break; + case Types.BIGINT: + fieldCoders.add(BigEndianLongCoder.of()); + break; + case Types.VARCHAR: + case Types.CHAR: + fieldCoders.add(StringUtf8Coder.of()); + break; + case Types.TIME: + fieldCoders.add(TimeCoder.of()); + break; + case Types.DATE: + case Types.TIMESTAMP: + fieldCoders.add(DateCoder.of()); + break; + case Types.BOOLEAN: + fieldCoders.add(BooleanCoder.of()); + break; + + default: + throw new UnsupportedOperationException( + "Data type: " + fieldTypes.get(idx) + " not supported yet!"); + } + } + return new BeamSqlRecordType(fieldNames, fieldTypes, fieldCoders); + } + + @Override + public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException { + int fieldType = fieldsType.get(index); + Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType); + if (javaClazz == null) { + throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!"); + } + + if (!fieldValue.getClass().equals(javaClazz)) { + throw new IllegalArgumentException( + String.format("[%s](%s) doesn't match type [%s]", + fieldValue, fieldValue.getClass(), fieldType) + ); + } + } + + public List<Integer> getFieldsType() { + return fieldsType; + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof BeamSqlRecordType) { + BeamSqlRecordType ins = (BeamSqlRecordType) obj; + return fieldsType.equals(ins.getFieldsType()) && getFieldsName().equals(ins.getFieldsName()); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * getFieldsName().hashCode() + getFieldsType().hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java deleted file mode 100644 index cb5c7ea..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema; - -import java.util.List; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.PCollection; - -/** - * {@link BeamSqlRow} represents one row element in a {@link PCollection}, - * with type provider {@link BeamSqlRowType}. - */ -public class BeamSqlRow extends BeamRecord { - public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) { - super(dataType, dataValues); - } - - public BeamSqlRow(BeamSqlRowType dataType) { - super(dataType); - } - - @Override - public BeamSqlRowType getDataType() { - return (BeamSqlRowType) super.getDataType(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java deleted file mode 100644 index c7656af..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.sql.Types; -import java.util.BitSet; -import java.util.Date; -import java.util.GregorianCalendar; -import org.apache.beam.sdk.coders.BigDecimalCoder; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.BitSetCoder; -import org.apache.beam.sdk.coders.ByteCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.DoubleCoder; -import org.apache.beam.sdk.coders.InstantCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; - -/** - * A {@link Coder} encodes {@link BeamSqlRow}. - */ -public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { - private BeamSqlRowType sqlRecordType; - - private static final BitSetCoder nullListCoder = BitSetCoder.of(); - - private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); - private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of(); - private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); - private static final DoubleCoder doubleCoder = DoubleCoder.of(); - private static final InstantCoder instantCoder = InstantCoder.of(); - private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of(); - private static final ByteCoder byteCoder = ByteCoder.of(); - - public BeamSqlRowCoder(BeamSqlRowType sqlRecordType) { - this.sqlRecordType = sqlRecordType; - } - - @Override - public void encode(BeamSqlRow value, OutputStream outStream) - throws CoderException, IOException { - nullListCoder.encode(value.getNullFields(), outStream); - for (int idx = 0; idx < value.size(); ++idx) { - if (value.getNullFields().get(idx)) { - continue; - } - - switch (sqlRecordType.getFieldsType().get(idx)) { - case Types.INTEGER: - intCoder.encode(value.getInteger(idx), outStream); - break; - case Types.SMALLINT: - intCoder.encode((int) value.getShort(idx), outStream); - break; - case Types.TINYINT: - byteCoder.encode(value.getByte(idx), outStream); - break; - case Types.DOUBLE: - doubleCoder.encode(value.getDouble(idx), outStream); - break; - case Types.FLOAT: - doubleCoder.encode((double) value.getFloat(idx), outStream); - break; - case Types.DECIMAL: - bigDecimalCoder.encode(value.getBigDecimal(idx), outStream); - break; - case Types.BIGINT: - longCoder.encode(value.getLong(idx), outStream); - break; - case Types.VARCHAR: - case Types.CHAR: - stringCoder.encode(value.getString(idx), outStream); - break; - case Types.TIME: - longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream); - break; - case Types.DATE: - case Types.TIMESTAMP: - longCoder.encode(value.getDate(idx).getTime(), outStream); - break; - case Types.BOOLEAN: - byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream); - break; - - default: - throw new UnsupportedOperationException( - "Data type: " + sqlRecordType.getFieldsType().get(idx) + " not supported yet!"); - } - } - - instantCoder.encode(value.getWindowStart(), outStream); - instantCoder.encode(value.getWindowEnd(), outStream); - } - - @Override - public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException { - BitSet nullFields = nullListCoder.decode(inStream); - - BeamSqlRow record = new BeamSqlRow(sqlRecordType); - record.setNullFields(nullFields); - for (int idx = 0; idx < sqlRecordType.size(); ++idx) { - if (nullFields.get(idx)) { - continue; - } - - switch (sqlRecordType.getFieldsType().get(idx)) { - case Types.INTEGER: - record.addField(idx, intCoder.decode(inStream)); - break; - case Types.SMALLINT: - record.addField(idx, intCoder.decode(inStream).shortValue()); - break; - case Types.TINYINT: - record.addField(idx, byteCoder.decode(inStream)); - break; - case Types.DOUBLE: - record.addField(idx, doubleCoder.decode(inStream)); - break; - case Types.FLOAT: - record.addField(idx, doubleCoder.decode(inStream).floatValue()); - break; - case Types.BIGINT: - record.addField(idx, longCoder.decode(inStream)); - break; - case Types.DECIMAL: - record.addField(idx, bigDecimalCoder.decode(inStream)); - break; - case Types.VARCHAR: - case Types.CHAR: - record.addField(idx, stringCoder.decode(inStream)); - break; - case Types.TIME: - GregorianCalendar calendar = new GregorianCalendar(); - calendar.setTime(new Date(longCoder.decode(inStream))); - record.addField(idx, calendar); - break; - case Types.DATE: - case Types.TIMESTAMP: - record.addField(idx, new Date(longCoder.decode(inStream))); - break; - case Types.BOOLEAN: - record.addField(idx, byteCoder.decode(inStream) == 1); - break; - - default: - throw new UnsupportedOperationException("Data type: " - + sqlRecordType.getFieldsType().get(idx) - + " not supported yet!"); - } - } - - record.setWindowStart(instantCoder.decode(inStream)); - record.setWindowEnd(instantCoder.decode(inStream)); - - return record; - } - - public BeamSqlRowType getSqlRecordType() { - return sqlRecordType; - } - - @Override - public void verifyDeterministic() - throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java deleted file mode 100644 index 7584dad..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema; - -import java.math.BigDecimal; -import java.sql.Types; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.values.BeamRecordTypeProvider; - -/** - * Type provider for {@link BeamSqlRow} with SQL types. - * - * <p>Limited SQL types are supported now, visit - * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a> - * for more details. - * - */ -public class BeamSqlRowType extends BeamRecordTypeProvider { - private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); - static { - SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); - } - - public List<Integer> fieldsType; - - protected BeamSqlRowType(List<String> fieldsName) { - super(fieldsName); - } - - public BeamSqlRowType(List<String> fieldsName, List<Integer> fieldsType) { - super(fieldsName); - this.fieldsType = fieldsType; - } - - public static BeamSqlRowType create(List<String> fieldNames, - List<Integer> fieldTypes) { - return new BeamSqlRowType(fieldNames, fieldTypes); - } - - @Override - public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException { - int fieldType = fieldsType.get(index); - Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType); - if (javaClazz == null) { - throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!"); - } - - if (!fieldValue.getClass().equals(javaClazz)) { - throw new IllegalArgumentException( - String.format("[%s](%s) doesn't match type [%s]", - fieldValue, fieldValue.getClass(), fieldType) - ); - } - } - - public List<Integer> getFieldsType() { - return fieldsType; - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof BeamSqlRowType) { - BeamSqlRowType ins = (BeamSqlRowType) obj; - return fieldsType.equals(ins.getFieldsType()) && getFieldsName().equals(ins.getFieldsName()); - } else { - return false; - } - } - - @Override - public int hashCode() { - return 31 * getFieldsName().hashCode() + getFieldsType().hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java index c179935..b370d9d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.schema; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -37,16 +38,16 @@ public interface BeamSqlTable { * create a {@code PCollection<BeamSqlRow>} from source. * */ - PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline); + PCollection<BeamRecord> buildIOReader(Pipeline pipeline); /** * create a {@code IO.write()} instance to write to target. * */ - PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter(); + PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter(); /** * Get the schema info of the table. */ - BeamSqlRowType getRowType(); + BeamSqlRecordType getRowType(); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java index c769928..63c9720 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java @@ -23,6 +23,7 @@ import java.io.StringReader; import java.io.StringWriter; import java.math.BigDecimal; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.NlsString; import org.apache.commons.csv.CSVFormat; @@ -34,11 +35,11 @@ import org.apache.commons.csv.CSVRecord; * Utility methods for working with {@code BeamTable}. */ public final class BeamTableUtils { - public static BeamSqlRow csvLine2BeamSqlRow( + public static BeamRecord csvLine2BeamSqlRow( CSVFormat csvFormat, String line, - BeamSqlRowType beamSqlRowType) { - BeamSqlRow row = new BeamSqlRow(beamSqlRowType); + BeamSqlRecordType beamSqlRowType) { + BeamRecord row = new BeamRecord(beamSqlRowType); try (StringReader reader = new StringReader(line)) { CSVParser parser = csvFormat.parse(reader); CSVRecord rawRecord = parser.getRecords().get(0); @@ -60,7 +61,7 @@ public final class BeamTableUtils { return row; } - public static String beamSqlRow2CsvLine(BeamSqlRow row, CSVFormat csvFormat) { + public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) { StringWriter writer = new StringWriter(); try (CSVPrinter printer = csvFormat.print(writer)) { for (int i = 0; i < row.size(); i++) { @@ -73,13 +74,14 @@ public final class BeamTableUtils { return writer.toString(); } - public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) { + public static void addFieldWithAutoTypeCasting(BeamRecord row, int idx, Object rawObj) { if (rawObj == null) { row.addField(idx, null); return; } - SqlTypeName columnType = CalciteUtils.getFieldType(row.getDataType(), idx); + SqlTypeName columnType = CalciteUtils.getFieldType(BeamSqlRecordHelper.getSqlRecordType(row) + , idx); // auto-casting for numberics if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType)) || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) { http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java index 2a50947..f137379 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java @@ -18,12 +18,12 @@ package org.apache.beam.sdk.extensions.sql.schema.kafka; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.csv.CSVFormat; @@ -34,45 +34,45 @@ import org.apache.commons.csv.CSVFormat; */ public class BeamKafkaCSVTable extends BeamKafkaTable { private CSVFormat csvFormat; - public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, + public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers, List<String> topics) { this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT); } - public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, + public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers, List<String> topics, CSVFormat format) { super(beamSqlRowType, bootstrapServers, topics); this.csvFormat = format; } @Override - public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> + public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> getPTransformForInput() { return new CsvRecorderDecoder(beamSqlRowType, csvFormat); } @Override - public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> + public PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() { return new CsvRecorderEncoder(beamSqlRowType, csvFormat); } /** - * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSqlRow}. + * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamRecord}. * */ public static class CsvRecorderDecoder - extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> { - private BeamSqlRowType rowType; + extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> { + private BeamSqlRecordType rowType; private CSVFormat format; - public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) { + public CsvRecorderDecoder(BeamSqlRecordType rowType, CSVFormat format) { this.rowType = rowType; this.format = format; } @Override - public PCollection<BeamSqlRow> expand(PCollection<KV<byte[], byte[]>> input) { - return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSqlRow>() { + public PCollection<BeamRecord> expand(PCollection<KV<byte[], byte[]>> input) { + return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamRecord>() { @ProcessElement public void processElement(ProcessContext c) { String rowInString = new String(c.element().getValue()); @@ -83,24 +83,24 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { } /** - * A PTransform to convert {@link BeamSqlRow} to {@code KV<byte[], byte[]>}. + * A PTransform to convert {@link BeamRecord} to {@code KV<byte[], byte[]>}. * */ public static class CsvRecorderEncoder - extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> { - private BeamSqlRowType rowType; + extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> { + private BeamSqlRecordType rowType; private CSVFormat format; - public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) { + public CsvRecorderEncoder(BeamSqlRecordType rowType, CSVFormat format) { this.rowType = rowType; this.format = format; } @Override - public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSqlRow> input) { - return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, KV<byte[], byte[]>>() { + public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamRecord> input) { + return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, KV<byte[], byte[]>>() { @ProcessElement public void processElement(ProcessContext c) { - BeamSqlRow in = c.element(); + BeamRecord in = c.element(); c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes())); } })); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java index 2cc664f..fac57bf 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java @@ -26,10 +26,10 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -48,11 +48,11 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab private List<String> topics; private Map<String, Object> configUpdates; - protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) { + protected BeamKafkaTable(BeamSqlRecordType beamSqlRowType) { super(beamSqlRowType); } - public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, + public BeamKafkaTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers, List<String> topics) { super(beamSqlRowType); this.bootstrapServers = bootstrapServers; @@ -69,14 +69,14 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab return BeamIOType.UNBOUNDED; } - public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> + public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> getPTransformForInput(); - public abstract PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> + public abstract PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput(); @Override - public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply("read", KafkaIO.<byte[], byte[]>read() .withBootstrapServers(bootstrapServers) @@ -89,13 +89,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab } @Override - public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { checkArgument(topics != null && topics.size() == 1, "Only one topic can be acceptable as output."); - return new PTransform<PCollection<BeamSqlRow>, PDone>() { + return new PTransform<PCollection<BeamRecord>, PDone>() { @Override - public PDone expand(PCollection<BeamSqlRow> input) { + public PDone expand(PCollection<BeamRecord> input) { return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", KafkaIO.<byte[], byte[]>write() .withBootstrapServers(bootstrapServers) http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java index c44faab..0ec418c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java @@ -19,10 +19,10 @@ package org.apache.beam.sdk.extensions.sql.schema.text; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -46,25 +46,25 @@ public class BeamTextCSVTable extends BeamTextTable { /** * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format. */ - public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern) { + public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern) { this(beamSqlRowType, filePattern, CSVFormat.DEFAULT); } - public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern, + public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern, CSVFormat csvFormat) { super(beamSqlRowType, filePattern); this.csvFormat = csvFormat; } @Override - public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern)) .apply("parseCSVLine", new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat)); } @Override - public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat); } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java index 06109c3..ecb77e0 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java @@ -19,12 +19,12 @@ package org.apache.beam.sdk.extensions.sql.schema.text; import java.io.Serializable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.csv.CSVFormat; @@ -32,13 +32,13 @@ import org.apache.commons.csv.CSVFormat; * IOReader for {@code BeamTextCSVTable}. */ public class BeamTextCSVTableIOReader - extends PTransform<PCollection<String>, PCollection<BeamSqlRow>> + extends PTransform<PCollection<String>, PCollection<BeamRecord>> implements Serializable { private String filePattern; - protected BeamSqlRowType beamSqlRowType; + protected BeamSqlRecordType beamSqlRowType; protected CSVFormat csvFormat; - public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String filePattern, + public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRowType, String filePattern, CSVFormat csvFormat) { this.filePattern = filePattern; this.beamSqlRowType = beamSqlRowType; @@ -46,8 +46,8 @@ public class BeamTextCSVTableIOReader } @Override - public PCollection<BeamSqlRow> expand(PCollection<String> input) { - return input.apply(ParDo.of(new DoFn<String, BeamSqlRow>() { + public PCollection<BeamRecord> expand(PCollection<String> input) { + return input.apply(ParDo.of(new DoFn<String, BeamRecord>() { @ProcessElement public void processElement(ProcessContext ctx) { String str = ctx.element(); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java index 1684b37..c616973 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java @@ -19,13 +19,13 @@ package org.apache.beam.sdk.extensions.sql.schema.text; import java.io.Serializable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.commons.csv.CSVFormat; @@ -33,24 +33,24 @@ import org.apache.commons.csv.CSVFormat; /** * IOWriter for {@code BeamTextCSVTable}. */ -public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSqlRow>, PDone> +public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone> implements Serializable { private String filePattern; - protected BeamSqlRowType beamSqlRowType; + protected BeamSqlRecordType beamSqlRowType; protected CSVFormat csvFormat; - public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePattern, + public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRowType, String filePattern, CSVFormat csvFormat) { this.filePattern = filePattern; this.beamSqlRowType = beamSqlRowType; this.csvFormat = csvFormat; } - @Override public PDone expand(PCollection<BeamSqlRow> input) { - return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, String>() { + @Override public PDone expand(PCollection<BeamRecord> input) { + return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, String>() { @ProcessElement public void processElement(ProcessContext ctx) { - BeamSqlRow row = ctx.element(); + BeamRecord row = ctx.element(); ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat)); } })).apply(TextIO.write().to(filePattern)); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java index e85608d..4284366 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.schema.text; import java.io.Serializable; import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; /** * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}). @@ -29,7 +29,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; public abstract class BeamTextTable extends BaseBeamTable implements Serializable { protected String filePattern; - protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) { + protected BeamTextTable(BeamSqlRecordType beamSqlRowType, String filePattern) { super(beamSqlRowType); this.filePattern = filePattern; } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java index e6ca18f..8501157 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java @@ -19,9 +19,9 @@ package org.apache.beam.sdk.extensions.sql; import java.sql.Types; import java.util.Arrays; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -49,16 +49,16 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { runAggregationWithoutWindow(unboundedInput1); } - private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception { + private void runAggregationWithoutWindow(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2"; - PCollection<BeamSqlRow> result = + PCollection<BeamRecord> result = input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"), + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), Arrays.asList(Types.INTEGER, Types.BIGINT)); - BeamSqlRow record = new BeamSqlRow(resultType); + BeamRecord record = new BeamRecord(resultType); record.addField("f_int2", 0); record.addField("size", 4L); @@ -83,7 +83,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { runAggregationFunctions(unboundedInput1); } - private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{ + private void runAggregationFunctions(PCollection<BeamRecord> input) throws Exception{ String sql = "select f_int2, count(*) as size, " + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1," + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2," @@ -94,11 +94,11 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { + "max(f_timestamp) as max6, min(f_timestamp) as min6 " + "FROM TABLE_A group by f_int2"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testAggregationFunctions", BeamSql.query(sql)); - BeamSqlRowType resultType = BeamSqlRowType.create( + BeamSqlRecordType resultType = BeamSqlRecordType.create( Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2", "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5", "max5", "min5", "max6", "min6"), @@ -108,7 +108,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.TIMESTAMP, Types.TIMESTAMP)); - BeamSqlRow record = new BeamSqlRow(resultType); + BeamRecord record = new BeamRecord(resultType); record.addField("f_int2", 0); record.addField("size", 4L); @@ -161,28 +161,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { runDistinct(unboundedInput1); } - private void runDistinct(PCollection<BeamSqlRow> input) throws Exception { + private void runDistinct(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION "; - PCollection<BeamSqlRow> result = + PCollection<BeamRecord> result = input.apply("testDistinct", BeamSql.simpleQuery(sql)); - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); - BeamSqlRow record1 = new BeamSqlRow(resultType); + BeamRecord record1 = new BeamRecord(resultType); record1.addField("f_int", 1); record1.addField("f_long", 1000L); - BeamSqlRow record2 = new BeamSqlRow(resultType); + BeamRecord record2 = new BeamRecord(resultType); record2.addField("f_int", 2); record2.addField("f_long", 2000L); - BeamSqlRow record3 = new BeamSqlRow(resultType); + BeamRecord record3 = new BeamRecord(resultType); record3.addField("f_int", 3); record3.addField("f_long", 3000L); - BeamSqlRow record4 = new BeamSqlRow(resultType); + BeamRecord record4 = new BeamRecord(resultType); record4.addField("f_int", 4); record4.addField("f_long", 4000L); @@ -207,27 +207,27 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { runTumbleWindow(unboundedInput1); } - private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception { + private void runTumbleWindow(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT f_int2, COUNT(*) AS `size`," + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`" + " FROM TABLE_A" + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testTumbleWindow", BeamSql.query(sql)); - BeamSqlRowType resultType = BeamSqlRowType.create( + BeamSqlRecordType resultType = BeamSqlRecordType.create( Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - BeamSqlRow record1 = new BeamSqlRow(resultType); + BeamRecord record1 = new BeamRecord(resultType); record1.addField("f_int2", 0); record1.addField("size", 3L); record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); - BeamSqlRow record2 = new BeamSqlRow(resultType); + BeamRecord record2 = new BeamRecord(resultType); record2.addField("f_int2", 0); record2.addField("size", 1L); record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); @@ -255,40 +255,40 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { runHopWindow(unboundedInput1); } - private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception { + private void runHopWindow(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT f_int2, COUNT(*) AS `size`," + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`" + " FROM PCOLLECTION" + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; - PCollection<BeamSqlRow> result = + PCollection<BeamRecord> result = input.apply("testHopWindow", BeamSql.simpleQuery(sql)); - BeamSqlRowType resultType = BeamSqlRowType.create( + BeamSqlRecordType resultType = BeamSqlRecordType.create( Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - BeamSqlRow record1 = new BeamSqlRow(resultType); + BeamRecord record1 = new BeamRecord(resultType); record1.addField("f_int2", 0); record1.addField("size", 3L); record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00")); record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime())); record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); - BeamSqlRow record2 = new BeamSqlRow(resultType); + BeamRecord record2 = new BeamRecord(resultType); record2.addField("f_int2", 0); record2.addField("size", 3L); record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); - BeamSqlRow record3 = new BeamSqlRow(resultType); + BeamRecord record3 = new BeamRecord(resultType); record3.addField("f_int2", 0); record3.addField("size", 1L); record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00")); record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime())); - BeamSqlRow record4 = new BeamSqlRow(resultType); + BeamRecord record4 = new BeamRecord(resultType); record4.addField("f_int2", 0); record4.addField("size", 1L); record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); @@ -316,27 +316,27 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { runSessionWindow(unboundedInput1); } - private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception { + private void runSessionWindow(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT f_int2, COUNT(*) AS `size`," + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`" + " FROM TABLE_A" + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testSessionWindow", BeamSql.query(sql)); - BeamSqlRowType resultType = BeamSqlRowType.create( + BeamSqlRecordType resultType = BeamSqlRecordType.create( Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - BeamSqlRow record1 = new BeamSqlRow(resultType); + BeamRecord record1 = new BeamRecord(resultType); record1.addField("f_int2", 0); record1.addField("size", 3L); record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03")); record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime())); record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime())); - BeamSqlRow record2 = new BeamSqlRow(resultType); + BeamRecord record2 = new BeamRecord(resultType); record2.addField("f_int2", 0); record2.addField("size", 1L); record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03")); @@ -357,8 +357,8 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1) .apply("testWindowOnNonTimestampField", BeamSql.query(sql)); pipeline.run().waitUntilFinish(); @@ -372,7 +372,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2"; - PCollection<BeamSqlRow> result = + PCollection<BeamRecord> result = boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql)); pipeline.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java index 0c1ce1c..d09caf0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java @@ -25,12 +25,11 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Instant; @@ -53,20 +52,20 @@ public class BeamSqlDslBase { @Rule public ExpectedException exceptions = ExpectedException.none(); - public static BeamSqlRowType rowTypeInTableA; - public static List<BeamSqlRow> recordsInTableA; + public static BeamSqlRecordType rowTypeInTableA; + public static List<BeamRecord> recordsInTableA; //bounded PCollections - public PCollection<BeamSqlRow> boundedInput1; - public PCollection<BeamSqlRow> boundedInput2; + public PCollection<BeamRecord> boundedInput1; + public PCollection<BeamRecord> boundedInput2; //unbounded PCollections - public PCollection<BeamSqlRow> unboundedInput1; - public PCollection<BeamSqlRow> unboundedInput2; + public PCollection<BeamRecord> unboundedInput1; + public PCollection<BeamRecord> unboundedInput2; @BeforeClass public static void prepareClass() throws ParseException { - rowTypeInTableA = BeamSqlRowType.create( + rowTypeInTableA = BeamSqlRecordType.create( Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string", "f_timestamp", "f_int2", "f_decimal"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT, @@ -78,20 +77,20 @@ public class BeamSqlDslBase { @Before public void preparePCollections(){ boundedInput1 = PBegin.in(pipeline).apply("boundedInput1", - Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA))); + Create.of(recordsInTableA).withCoder(rowTypeInTableA.getRecordCoder())); boundedInput2 = PBegin.in(pipeline).apply("boundedInput2", - Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA))); + Create.of(recordsInTableA.get(0)).withCoder(rowTypeInTableA.getRecordCoder())); unboundedInput1 = prepareUnboundedPCollection1(); unboundedInput2 = prepareUnboundedPCollection2(); } - private PCollection<BeamSqlRow> prepareUnboundedPCollection1() { - TestStream.Builder<BeamSqlRow> values = TestStream - .create(new BeamSqlRowCoder(rowTypeInTableA)); + private PCollection<BeamRecord> prepareUnboundedPCollection1() { + TestStream.Builder<BeamRecord> values = TestStream + .create(rowTypeInTableA.getRecordCoder()); - for (BeamSqlRow row : recordsInTableA) { + for (BeamRecord row : recordsInTableA) { values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); values = values.addElements(row); } @@ -99,21 +98,21 @@ public class BeamSqlDslBase { return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity()); } - private PCollection<BeamSqlRow> prepareUnboundedPCollection2() { - TestStream.Builder<BeamSqlRow> values = TestStream - .create(new BeamSqlRowCoder(rowTypeInTableA)); + private PCollection<BeamRecord> prepareUnboundedPCollection2() { + TestStream.Builder<BeamRecord> values = TestStream + .create(rowTypeInTableA.getRecordCoder()); - BeamSqlRow row = recordsInTableA.get(0); + BeamRecord row = recordsInTableA.get(0); values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); values = values.addElements(row); return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity()); } - private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{ - List<BeamSqlRow> rows = new ArrayList<>(); + private static List<BeamRecord> prepareInputRowsInTableA() throws ParseException{ + List<BeamRecord> rows = new ArrayList<>(); - BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA); + BeamRecord row1 = new BeamRecord(rowTypeInTableA); row1.addField(0, 1); row1.addField(1, 1000L); row1.addField(2, Short.valueOf("1")); @@ -126,7 +125,7 @@ public class BeamSqlDslBase { row1.addField(9, new BigDecimal(1)); rows.add(row1); - BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA); + BeamRecord row2 = new BeamRecord(rowTypeInTableA); row2.addField(0, 2); row2.addField(1, 2000L); row2.addField(2, Short.valueOf("2")); @@ -139,7 +138,7 @@ public class BeamSqlDslBase { row2.addField(9, new BigDecimal(2)); rows.add(row2); - BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA); + BeamRecord row3 = new BeamRecord(rowTypeInTableA); row3.addField(0, 3); row3.addField(1, 3000L); row3.addField(2, Short.valueOf("3")); @@ -152,7 +151,7 @@ public class BeamSqlDslBase { row3.addField(9, new BigDecimal(3)); rows.add(row3); - BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA); + BeamRecord row4 = new BeamRecord(rowTypeInTableA); row4.addField(0, 4); row4.addField(1, 4000L); row4.addField(2, Short.valueOf("4")); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java index 16b6426..e1d463b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java @@ -17,8 +17,8 @@ */ package org.apache.beam.sdk.extensions.sql; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -44,10 +44,10 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { runSingleFilter(unboundedInput1); } - private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception { + private void runSingleFilter(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1"; - PCollection<BeamSqlRow> result = + PCollection<BeamRecord> result = input.apply("testSingleFilter", BeamSql.simpleQuery(sql)); PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); @@ -71,12 +71,12 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { runCompositeFilter(unboundedInput1); } - private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception { + private void runCompositeFilter(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT * FROM TABLE_A" + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testCompositeFilter", BeamSql.query(sql)); PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2)); @@ -100,11 +100,11 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { runNoReturnFilter(unboundedInput1); } - private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception { + private void runNoReturnFilter(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT * FROM TABLE_A WHERE f_int < 1"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testNoReturnFilter", BeamSql.query(sql)); PAssert.that(result).empty(); @@ -120,8 +120,8 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { String sql = "SELECT * FROM TABLE_B WHERE f_int < 1"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1) .apply("testFromInvalidTableName1", BeamSql.query(sql)); pipeline.run().waitUntilFinish(); @@ -135,7 +135,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { String sql = "SELECT * FROM PCOLLECTION_NA"; - PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); + PCollection<BeamRecord> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); pipeline.run().waitUntilFinish(); } @@ -148,7 +148,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0"; - PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); + PCollection<BeamRecord> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); pipeline.run().waitUntilFinish(); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java index d75af9b..d5d0a24 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java @@ -23,11 +23,11 @@ import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBo import java.sql.Types; import java.util.Arrays; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.coders.BeamRecordCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -41,8 +41,8 @@ public class BeamSqlDslJoinTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlRowType SOURCE_RECORD_TYPE = - BeamSqlRowType.create( + private static final BeamSqlRecordType SOURCE_RECORD_TYPE = + BeamSqlRecordType.create( Arrays.asList( "order_id", "site_id", "price" ), @@ -51,11 +51,10 @@ public class BeamSqlDslJoinTest { ) ); - private static final BeamSqlRowCoder SOURCE_CODER = - new BeamSqlRowCoder(SOURCE_RECORD_TYPE); + private static final BeamRecordCoder SOURCE_CODER = SOURCE_RECORD_TYPE.getRecordCoder(); - private static final BeamSqlRowType RESULT_RECORD_TYPE = - BeamSqlRowType.create( + private static final BeamSqlRecordType RESULT_RECORD_TYPE = + BeamSqlRecordType.create( Arrays.asList( "order_id", "site_id", "price", "order_id0", "site_id0", "price0" ), @@ -65,8 +64,7 @@ public class BeamSqlDslJoinTest { ) ); - private static final BeamSqlRowCoder RESULT_CODER = - new BeamSqlRowCoder(RESULT_RECORD_TYPE); + private static final BeamRecordCoder RESULT_CODER = RESULT_RECORD_TYPE.getRecordCoder(); @Test public void testInnerJoin() throws Exception { @@ -178,13 +176,13 @@ public class BeamSqlDslJoinTest { pipeline.run(); } - private PCollection<BeamSqlRow> queryFromOrderTables(String sql) { + private PCollection<BeamRecord> queryFromOrderTables(String sql) { return PCollectionTuple .of( - new TupleTag<BeamSqlRow>("ORDER_DETAILS1"), + new TupleTag<BeamRecord>("ORDER_DETAILS1"), ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER) ) - .and(new TupleTag<BeamSqlRow>("ORDER_DETAILS2"), + .and(new TupleTag<BeamRecord>("ORDER_DETAILS2"), ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER) ).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java index 6468011..ddb90d5 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java @@ -19,9 +19,9 @@ package org.apache.beam.sdk.extensions.sql; import java.sql.Types; import java.util.Arrays; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -47,10 +47,10 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { runSelectAll(unboundedInput2); } - private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception { + private void runSelectAll(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT * FROM PCOLLECTION"; - PCollection<BeamSqlRow> result = + PCollection<BeamRecord> result = input.apply("testSelectAll", BeamSql.simpleQuery(sql)); PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); @@ -74,17 +74,17 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { runPartialFields(unboundedInput2); } - private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception { + private void runPartialFields(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT f_int, f_long FROM TABLE_A"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testPartialFields", BeamSql.query(sql)); - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); - BeamSqlRow record = new BeamSqlRow(resultType); + BeamRecord record = new BeamRecord(resultType); record.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); record.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); @@ -109,29 +109,29 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { runPartialFieldsInMultipleRow(unboundedInput1); } - private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws Exception { + private void runPartialFieldsInMultipleRow(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT f_int, f_long FROM TABLE_A"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql)); - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); - BeamSqlRow record1 = new BeamSqlRow(resultType); + BeamRecord record1 = new BeamRecord(resultType); record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); - BeamSqlRow record2 = new BeamSqlRow(resultType); + BeamRecord record2 = new BeamRecord(resultType); record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); - BeamSqlRow record3 = new BeamSqlRow(resultType); + BeamRecord record3 = new BeamRecord(resultType); record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); - BeamSqlRow record4 = new BeamSqlRow(resultType); + BeamRecord record4 = new BeamRecord(resultType); record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); @@ -156,29 +156,29 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { runPartialFieldsInRows(unboundedInput1); } - private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception { + private void runPartialFieldsInRows(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT f_int, f_long FROM TABLE_A"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testPartialFieldsInRows", BeamSql.query(sql)); - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); - BeamSqlRow record1 = new BeamSqlRow(resultType); + BeamRecord record1 = new BeamRecord(resultType); record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); - BeamSqlRow record2 = new BeamSqlRow(resultType); + BeamRecord record2 = new BeamRecord(resultType); record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); - BeamSqlRow record3 = new BeamSqlRow(resultType); + BeamRecord record3 = new BeamRecord(resultType); record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); - BeamSqlRow record4 = new BeamSqlRow(resultType); + BeamRecord record4 = new BeamRecord(resultType); record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); @@ -203,17 +203,17 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { runLiteralField(unboundedInput2); } - public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception { + public void runLiteralField(PCollection<BeamRecord> input) throws Exception { String sql = "SELECT 1 as literal_field FROM TABLE_A"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testLiteralField", BeamSql.query(sql)); - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"), + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"), Arrays.asList(Types.INTEGER)); - BeamSqlRow record = new BeamSqlRow(resultType); + BeamRecord record = new BeamRecord(resultType); record.addField("literal_field", 1); PAssert.that(result).containsInAnyOrder(record); @@ -229,8 +229,8 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { String sql = "SELECT f_int_na FROM TABLE_A"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) + PCollection<BeamRecord> result = + PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1) .apply("testProjectUnknownField", BeamSql.query(sql)); pipeline.run().waitUntilFinish();
