Repository: beam Updated Branches: refs/heads/DSL_SQL 10962a34d -> 8f922f74b
move BeamRecord to sdk/core Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52933a64 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52933a64 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52933a64 Branch: refs/heads/DSL_SQL Commit: 52933a640393a107eddbd3d88670507b03595e1f Parents: 10962a3 Author: mingmxu <[email protected]> Authored: Wed Aug 2 01:20:50 2017 -0700 Committer: mingmxu <[email protected]> Committed: Wed Aug 2 23:52:29 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/coders/BeamRecordCoder.java | 84 ++++++ .../org/apache/beam/sdk/values/BeamRecord.java | 279 ++++++++++++++++++ .../beam/sdk/values/BeamRecordTypeProvider.java | 59 ++++ .../apache/beam/sdk/extensions/sql/BeamSql.java | 2 +- .../beam/sdk/extensions/sql/BeamSqlCli.java | 4 +- .../sdk/extensions/sql/schema/BeamSqlRow.java | 293 +------------------ .../extensions/sql/schema/BeamSqlRowCoder.java | 79 ++--- .../extensions/sql/schema/BeamSqlRowType.java | 91 +++++- 8 files changed, 555 insertions(+), 336 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java new file mode 100644 index 0000000..ad27f4e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.BeamRecordTypeProvider; + +/** + * A {@link Coder} for {@link BeamRecord}. It wraps the {@link Coder} for each element directly. + */ +@Experimental +public class BeamRecordCoder extends CustomCoder<BeamRecord> { + private static final ListCoder<Integer> nullListCoder = ListCoder.of(BigEndianIntegerCoder.of()); + private static final InstantCoder instantCoder = InstantCoder.of(); + + private BeamRecordTypeProvider recordType; + private List<Coder> coderArray; + + public BeamRecordCoder(BeamRecordTypeProvider recordType, List<Coder> coderArray) { + this.recordType = recordType; + this.coderArray = coderArray; + } + + @Override + public void encode(BeamRecord value, OutputStream outStream) + throws CoderException, IOException { + nullListCoder.encode(value.getNullFields(), outStream); + for (int idx = 0; idx < value.size(); ++idx) { + if (value.getNullFields().contains(idx)) { + continue; + } + + coderArray.get(idx).encode(value.getInteger(idx), outStream); + } + + instantCoder.encode(value.getWindowStart(), outStream); + instantCoder.encode(value.getWindowEnd(), outStream); + } + + @Override + public BeamRecord decode(InputStream inStream) throws CoderException, IOException { + List<Integer> nullFields = nullListCoder.decode(inStream); + + BeamRecord record = new BeamRecord(recordType); + record.setNullFields(nullFields); + for (int idx = 0; idx < recordType.size(); ++idx) { + if (nullFields.contains(idx)) { + continue; + } + + record.addField(idx, coderArray.get(idx).decode(inStream)); + } + + record.setWindowStart(instantCoder.decode(inStream)); + record.setWindowEnd(instantCoder.decode(inStream)); + + return record; + } + + @Override + public void verifyDeterministic() + throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java new file mode 100644 index 0000000..d1c1c17 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.joda.time.Instant; + +/** + * {@link org.apache.beam.sdk.values.BeamRecord}, self-described with + * {@link BeamRecordTypeProvider}, represents one element in a + * {@link org.apache.beam.sdk.values.PCollection}. + */ +@Experimental +public class BeamRecord implements Serializable { + //null values are indexed here, to handle properly in Coder. + private List<Integer> nullFields = new ArrayList<>(); + private List<Object> dataValues; + private BeamRecordTypeProvider dataType; + + private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); + private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); + + public BeamRecord(BeamRecordTypeProvider dataType) { + this.dataType = dataType; + this.dataValues = new ArrayList<>(); + for (int idx = 0; idx < dataType.size(); ++idx) { + dataValues.add(null); + nullFields.add(idx); + } + } + + public BeamRecord(BeamRecordTypeProvider dataType, List<Object> dataValues) { + this(dataType); + for (int idx = 0; idx < dataValues.size(); ++idx) { + addField(idx, dataValues.get(idx)); + } + } + + public void updateWindowRange(BeamRecord upstreamRecord, BoundedWindow window){ + windowStart = upstreamRecord.windowStart; + windowEnd = upstreamRecord.windowEnd; + + if (window instanceof IntervalWindow) { + IntervalWindow iWindow = (IntervalWindow) window; + windowStart = iWindow.start(); + windowEnd = iWindow.end(); + } + } + + public void addField(String fieldName, Object fieldValue) { + addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); + } + + public void addField(int index, Object fieldValue) { + if (fieldValue == null) { + return; + } else { + if (nullFields.contains(index)) { + nullFields.remove(nullFields.indexOf(index)); + } + } + + dataType.validateValueType(index, fieldValue); + dataValues.set(index, fieldValue); + } + + public Object getFieldValue(String fieldName) { + return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); + } + + public byte getByte(String fieldName) { + return (Byte) getFieldValue(fieldName); + } + + public short getShort(String fieldName) { + return (Short) getFieldValue(fieldName); + } + + public int getInteger(String fieldName) { + return (Integer) getFieldValue(fieldName); + } + + public float getFloat(String fieldName) { + return (Float) getFieldValue(fieldName); + } + + public double getDouble(String fieldName) { + return (Double) getFieldValue(fieldName); + } + + public long getLong(String fieldName) { + return (Long) getFieldValue(fieldName); + } + + public String getString(String fieldName) { + return (String) getFieldValue(fieldName); + } + + public Date getDate(String fieldName) { + return (Date) getFieldValue(fieldName); + } + + public GregorianCalendar getGregorianCalendar(String fieldName) { + return (GregorianCalendar) getFieldValue(fieldName); + } + + public BigDecimal getBigDecimal(String fieldName) { + return (BigDecimal) getFieldValue(fieldName); + } + + public boolean getBoolean(String fieldName) { + return (boolean) getFieldValue(fieldName); + } + + public Object getFieldValue(int fieldIdx) { + if (nullFields.contains(fieldIdx)) { + return null; + } + + return dataValues.get(fieldIdx); + } + + public byte getByte(int idx) { + return (Byte) getFieldValue(idx); + } + + public short getShort(int idx) { + return (Short) getFieldValue(idx); + } + + public int getInteger(int idx) { + return (Integer) getFieldValue(idx); + } + + public float getFloat(int idx) { + return (Float) getFieldValue(idx); + } + + public double getDouble(int idx) { + return (Double) getFieldValue(idx); + } + + public long getLong(int idx) { + return (Long) getFieldValue(idx); + } + + public String getString(int idx) { + return (String) getFieldValue(idx); + } + + public Date getDate(int idx) { + return (Date) getFieldValue(idx); + } + + public GregorianCalendar getGregorianCalendar(int idx) { + return (GregorianCalendar) getFieldValue(idx); + } + + public BigDecimal getBigDecimal(int idx) { + return (BigDecimal) getFieldValue(idx); + } + + public boolean getBoolean(int idx) { + return (boolean) getFieldValue(idx); + } + + public int size() { + return dataValues.size(); + } + + public List<Object> getDataValues() { + return dataValues; + } + + public void setDataValues(List<Object> dataValues) { + this.dataValues = dataValues; + } + + public BeamRecordTypeProvider getDataType() { + return dataType; + } + + public void setDataType(BeamRecordTypeProvider dataType) { + this.dataType = dataType; + } + + public void setNullFields(List<Integer> nullFields) { + this.nullFields = nullFields; + } + + public List<Integer> getNullFields() { + return nullFields; + } + + /** + * is the specified field NULL? + */ + public boolean isNull(int idx) { + return nullFields.contains(idx); + } + + public Instant getWindowStart() { + return windowStart; + } + + public Instant getWindowEnd() { + return windowEnd; + } + + public void setWindowStart(Instant windowStart) { + this.windowStart = windowStart; + } + + public void setWindowEnd(Instant windowEnd) { + this.windowEnd = windowEnd; + } + + @Override + public String toString() { + return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType=" + + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]"; + } + + /** + * Return data fields as key=value. + */ + public String valueInString() { + StringBuilder sb = new StringBuilder(); + for (int idx = 0; idx < size(); ++idx) { + sb.append( + String.format(",%s=%s", getDataType().getFieldsName().get(idx), getFieldValue(idx))); + } + return sb.substring(1); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + BeamRecord other = (BeamRecord) obj; + return toString().equals(other.toString()); + } + + @Override public int hashCode() { + return 31 * (31 * getDataType().hashCode() + getDataValues().hashCode()) + + getNullFields().hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java new file mode 100644 index 0000000..63a961c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; + +/** + * The default type provider used in {@link BeamRecord}. + */ +@Experimental +public class BeamRecordTypeProvider implements Serializable{ + private List<String> fieldsName; + + public BeamRecordTypeProvider(List<String> fieldsName) { + this.fieldsName = fieldsName; + } + + /** + * Validate input fieldValue for a field. + * @throws IllegalArgumentException throw exception when the validation fails. + */ + public void validateValueType(int index, Object fieldValue) + throws IllegalArgumentException{ + //do nothing by default. + } + + public List<String> getFieldsName(){ + return fieldsName; + } + + public String getFieldByIndex(int index){ + return fieldsName.get(index); + } + + public int findIndexOfField(String fieldName){ + return fieldsName.indexOf(fieldName); + } + + public int size(){ + return fieldsName.size(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java index e0d7a78..0dabf40 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java @@ -167,7 +167,7 @@ public class BeamSql { BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); getSqlEnv().registerTable(sourceTag.getId(), - new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); + new BeamPCollectionTable(sourceStream, sourceCoder.getSqlRecordType())); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java index 3bea46a..967dee5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java @@ -56,8 +56,8 @@ public class BeamSqlCli { /** * compile SQL, and return a {@link Pipeline}. */ - public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline - , BeamSqlEnv sqlEnv) throws Exception{ + public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline, + BeamSqlEnv sqlEnv) throws Exception{ PCollection<BeamSqlRow> resultStream = sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv); return resultStream; http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java index 2e0efe8..cb5c7ea 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java @@ -17,298 +17,25 @@ */ package org.apache.beam.sdk.extensions.sql.schema; -import java.io.Serializable; -import java.math.BigDecimal; -import java.sql.Types; -import java.util.ArrayList; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.calcite.sql.type.SqlTypeName; -import org.joda.time.Instant; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; /** - * Represent a generic ROW record in Beam SQL. - * + * {@link BeamSqlRow} represents one row element in a {@link PCollection}, + * with type provider {@link BeamSqlRowType}. */ -public class BeamSqlRow implements Serializable { - private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); - static { - SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); - } - - private List<Integer> nullFields = new ArrayList<>(); - private List<Object> dataValues; - private BeamSqlRowType dataType; - - private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); - private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); - - public BeamSqlRow(BeamSqlRowType dataType) { - this.dataType = dataType; - this.dataValues = new ArrayList<>(); - for (int idx = 0; idx < dataType.size(); ++idx) { - dataValues.add(null); - nullFields.add(idx); - } - } - +public class BeamSqlRow extends BeamRecord { public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) { - this(dataType); - for (int idx = 0; idx < dataValues.size(); ++idx) { - addField(idx, dataValues.get(idx)); - } - } - - public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow window){ - windowStart = upstreamRecord.windowStart; - windowEnd = upstreamRecord.windowEnd; - - if (window instanceof IntervalWindow) { - IntervalWindow iWindow = (IntervalWindow) window; - windowStart = iWindow.start(); - windowEnd = iWindow.end(); - } - } - - public void addField(String fieldName, Object fieldValue) { - addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); - } - - public void addField(int index, Object fieldValue) { - if (fieldValue == null) { - return; - } else { - if (nullFields.contains(index)) { - nullFields.remove(nullFields.indexOf(index)); - } - } - - validateValueType(index, fieldValue); - dataValues.set(index, fieldValue); - } - - private void validateValueType(int index, Object fieldValue) { - SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index); - Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType)); - if (javaClazz == null) { - throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!"); - } - - if (!fieldValue.getClass().equals(javaClazz)) { - throw new IllegalArgumentException( - String.format("[%s](%s) doesn't match type [%s]", - fieldValue, fieldValue.getClass(), fieldType) - ); - } - } - - public Object getFieldValue(String fieldName) { - return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); - } - - public byte getByte(String fieldName) { - return (Byte) getFieldValue(fieldName); - } - - public short getShort(String fieldName) { - return (Short) getFieldValue(fieldName); - } - - public int getInteger(String fieldName) { - return (Integer) getFieldValue(fieldName); - } - - public float getFloat(String fieldName) { - return (Float) getFieldValue(fieldName); - } - - public double getDouble(String fieldName) { - return (Double) getFieldValue(fieldName); - } - - public long getLong(String fieldName) { - return (Long) getFieldValue(fieldName); - } - - public String getString(String fieldName) { - return (String) getFieldValue(fieldName); - } - - public Date getDate(String fieldName) { - return (Date) getFieldValue(fieldName); - } - - public GregorianCalendar getGregorianCalendar(String fieldName) { - return (GregorianCalendar) getFieldValue(fieldName); - } - - public BigDecimal getBigDecimal(String fieldName) { - return (BigDecimal) getFieldValue(fieldName); - } - - public boolean getBoolean(String fieldName) { - return (boolean) getFieldValue(fieldName); - } - - public Object getFieldValue(int fieldIdx) { - if (nullFields.contains(fieldIdx)) { - return null; - } - - return dataValues.get(fieldIdx); - } - - public byte getByte(int idx) { - return (Byte) getFieldValue(idx); + super(dataType, dataValues); } - public short getShort(int idx) { - return (Short) getFieldValue(idx); - } - - public int getInteger(int idx) { - return (Integer) getFieldValue(idx); - } - - public float getFloat(int idx) { - return (Float) getFieldValue(idx); - } - - public double getDouble(int idx) { - return (Double) getFieldValue(idx); - } - - public long getLong(int idx) { - return (Long) getFieldValue(idx); - } - - public String getString(int idx) { - return (String) getFieldValue(idx); - } - - public Date getDate(int idx) { - return (Date) getFieldValue(idx); - } - - public GregorianCalendar getGregorianCalendar(int idx) { - return (GregorianCalendar) getFieldValue(idx); - } - - public BigDecimal getBigDecimal(int idx) { - return (BigDecimal) getFieldValue(idx); - } - - public boolean getBoolean(int idx) { - return (boolean) getFieldValue(idx); - } - - public int size() { - return dataValues.size(); - } - - public List<Object> getDataValues() { - return dataValues; - } - - public void setDataValues(List<Object> dataValues) { - this.dataValues = dataValues; - } - - public BeamSqlRowType getDataType() { - return dataType; - } - - public void setDataType(BeamSqlRowType dataType) { - this.dataType = dataType; - } - - public void setNullFields(List<Integer> nullFields) { - this.nullFields = nullFields; - } - - public List<Integer> getNullFields() { - return nullFields; - } - - /** - * is the specified field NULL? - */ - public boolean isNull(int idx) { - return nullFields.contains(idx); - } - - public Instant getWindowStart() { - return windowStart; - } - - public Instant getWindowEnd() { - return windowEnd; - } - - public void setWindowStart(Instant windowStart) { - this.windowStart = windowStart; - } - - public void setWindowEnd(Instant windowEnd) { - this.windowEnd = windowEnd; - } - - @Override - public String toString() { - return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType=" - + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]"; - } - - /** - * Return data fields as key=value. - */ - public String valueInString() { - StringBuilder sb = new StringBuilder(); - for (int idx = 0; idx < size(); ++idx) { - sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx))); - } - return sb.substring(1); + public BeamSqlRow(BeamSqlRowType dataType) { + super(dataType); } @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - BeamSqlRow other = (BeamSqlRow) obj; - return toString().equals(other.toString()); - } - - @Override public int hashCode() { - return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode(); + public BeamSqlRowType getDataType() { + return (BeamSqlRowType) super.getDataType(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java index bf097d4..3d760c4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.schema; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.sql.Types; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; @@ -34,13 +35,12 @@ import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; /** * A {@link Coder} encodes {@link BeamSqlRow}. */ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { - private BeamSqlRowType tableSchema; + private BeamSqlRowType sqlRecordType; private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of()); @@ -52,58 +52,59 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of(); private static final ByteCoder byteCoder = ByteCoder.of(); - public BeamSqlRowCoder(BeamSqlRowType tableSchema) { - this.tableSchema = tableSchema; + public BeamSqlRowCoder(BeamSqlRowType sqlRecordType) { + this.sqlRecordType = sqlRecordType; } @Override - public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException { + public void encode(BeamSqlRow value, OutputStream outStream) + throws CoderException, IOException { listCoder.encode(value.getNullFields(), outStream); for (int idx = 0; idx < value.size(); ++idx) { if (value.getNullFields().contains(idx)) { continue; } - switch (CalciteUtils.getFieldType(value.getDataType(), idx)) { - case INTEGER: + switch (sqlRecordType.getFieldsType().get(idx)) { + case Types.INTEGER: intCoder.encode(value.getInteger(idx), outStream); break; - case SMALLINT: + case Types.SMALLINT: intCoder.encode((int) value.getShort(idx), outStream); break; - case TINYINT: + case Types.TINYINT: byteCoder.encode(value.getByte(idx), outStream); break; - case DOUBLE: + case Types.DOUBLE: doubleCoder.encode(value.getDouble(idx), outStream); break; - case FLOAT: + case Types.FLOAT: doubleCoder.encode((double) value.getFloat(idx), outStream); break; - case DECIMAL: + case Types.DECIMAL: bigDecimalCoder.encode(value.getBigDecimal(idx), outStream); break; - case BIGINT: + case Types.BIGINT: longCoder.encode(value.getLong(idx), outStream); break; - case VARCHAR: - case CHAR: + case Types.VARCHAR: + case Types.CHAR: stringCoder.encode(value.getString(idx), outStream); break; - case TIME: + case Types.TIME: longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream); break; - case DATE: - case TIMESTAMP: + case Types.DATE: + case Types.TIMESTAMP: longCoder.encode(value.getDate(idx).getTime(), outStream); break; - case BOOLEAN: + case Types.BOOLEAN: byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream); break; default: throw new UnsupportedOperationException( - "Data type: " + value.getDataType().getFieldsType().get(idx) + " not supported yet!"); + "Data type: " + sqlRecordType.getFieldsType().get(idx) + " not supported yet!"); } } @@ -115,55 +116,55 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException { List<Integer> nullFields = listCoder.decode(inStream); - BeamSqlRow record = new BeamSqlRow(tableSchema); + BeamSqlRow record = new BeamSqlRow(sqlRecordType); record.setNullFields(nullFields); - for (int idx = 0; idx < tableSchema.size(); ++idx) { + for (int idx = 0; idx < sqlRecordType.size(); ++idx) { if (nullFields.contains(idx)) { continue; } - switch (CalciteUtils.getFieldType(tableSchema, idx)) { - case INTEGER: + switch (sqlRecordType.getFieldsType().get(idx)) { + case Types.INTEGER: record.addField(idx, intCoder.decode(inStream)); break; - case SMALLINT: + case Types.SMALLINT: record.addField(idx, intCoder.decode(inStream).shortValue()); break; - case TINYINT: + case Types.TINYINT: record.addField(idx, byteCoder.decode(inStream)); break; - case DOUBLE: + case Types.DOUBLE: record.addField(idx, doubleCoder.decode(inStream)); break; - case FLOAT: + case Types.FLOAT: record.addField(idx, doubleCoder.decode(inStream).floatValue()); break; - case BIGINT: + case Types.BIGINT: record.addField(idx, longCoder.decode(inStream)); break; - case DECIMAL: + case Types.DECIMAL: record.addField(idx, bigDecimalCoder.decode(inStream)); break; - case VARCHAR: - case CHAR: + case Types.VARCHAR: + case Types.CHAR: record.addField(idx, stringCoder.decode(inStream)); break; - case TIME: + case Types.TIME: GregorianCalendar calendar = new GregorianCalendar(); calendar.setTime(new Date(longCoder.decode(inStream))); record.addField(idx, calendar); break; - case DATE: - case TIMESTAMP: + case Types.DATE: + case Types.TIMESTAMP: record.addField(idx, new Date(longCoder.decode(inStream))); break; - case BOOLEAN: + case Types.BOOLEAN: record.addField(idx, byteCoder.decode(inStream) == 1); break; default: throw new UnsupportedOperationException("Data type: " - + CalciteUtils.toCalciteType(tableSchema.getFieldsType().get(idx)) + + sqlRecordType.getFieldsType().get(idx) + " not supported yet!"); } } @@ -174,8 +175,8 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { return record; } - public BeamSqlRowType getTableSchema() { - return tableSchema; + public BeamSqlRowType getSqlRecordType() { + return sqlRecordType; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java index 018fe81..7584dad 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java @@ -17,24 +17,93 @@ */ package org.apache.beam.sdk.extensions.sql.schema; -import com.google.auto.value.AutoValue; -import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Types; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.values.BeamRecordTypeProvider; /** - * Field type information in {@link BeamSqlRow}. + * Type provider for {@link BeamSqlRow} with SQL types. + * + * <p>Limited SQL types are supported now, visit + * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a> + * for more details. * */ -@AutoValue -public abstract class BeamSqlRowType implements Serializable { - public abstract List<String> getFieldsName(); - public abstract List<Integer> getFieldsType(); +public class BeamSqlRowType extends BeamRecordTypeProvider { + private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); + static { + SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); + } + + public List<Integer> fieldsType; + + protected BeamSqlRowType(List<String> fieldsName) { + super(fieldsName); + } + + public BeamSqlRowType(List<String> fieldsName, List<Integer> fieldsType) { + super(fieldsName); + this.fieldsType = fieldsType; + } + + public static BeamSqlRowType create(List<String> fieldNames, + List<Integer> fieldTypes) { + return new BeamSqlRowType(fieldNames, fieldTypes); + } + + @Override + public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException { + int fieldType = fieldsType.get(index); + Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType); + if (javaClazz == null) { + throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!"); + } + + if (!fieldValue.getClass().equals(javaClazz)) { + throw new IllegalArgumentException( + String.format("[%s](%s) doesn't match type [%s]", + fieldValue, fieldValue.getClass(), fieldType) + ); + } + } + + public List<Integer> getFieldsType() { + return fieldsType; + } - public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) { - return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes); + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof BeamSqlRowType) { + BeamSqlRowType ins = (BeamSqlRowType) obj; + return fieldsType.equals(ins.getFieldsType()) && getFieldsName().equals(ins.getFieldsName()); + } else { + return false; + } } - public int size() { - return getFieldsName().size(); + @Override + public int hashCode() { + return 31 * getFieldsName().hashCode() + getFieldsType().hashCode(); } }
