Repository: beam Updated Branches: refs/heads/DSL_SQL 79880b6ab -> 880531aa2
[rebased] remove nullFields in BeamRecord Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4871edbc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4871edbc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4871edbc Branch: refs/heads/DSL_SQL Commit: 4871edbc430a2e360ba59f10eeafbe2205d47ec1 Parents: 79880b6 Author: mingmxu <ming...@ebay.com> Authored: Mon Aug 7 14:36:36 2017 -0700 Committer: mingmxu <ming...@ebay.com> Committed: Mon Aug 7 14:36:36 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/coders/BeamRecordCoder.java | 15 ++++++++++++- .../org/apache/beam/sdk/values/BeamRecord.java | 23 +++----------------- .../extensions/sql/impl/rel/BeamSortRel.java | 4 ---- .../sql/schema/BeamSqlRecordType.java | 4 ++++ 4 files changed, 21 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4871edbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java index fe9c295..40b9f3f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java @@ -55,7 +55,7 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> { @Override public void encode(BeamRecord value, OutputStream outStream) throws CoderException, IOException { - nullListCoder.encode(value.getNullFields(), outStream); + nullListCoder.encode(scanNullFields(value), outStream); for (int idx = 0; idx < value.size(); ++idx) { if (value.isNull(idx)) { continue; @@ -81,6 +81,19 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> { return record; } + /** + * Scan {@link BeamRecord} to find fields with a NULL value. + */ + private BitSet scanNullFields(BeamRecord record){ + BitSet nullFields = new BitSet(record.size()); + for (int idx = 0; idx < record.size(); ++idx) { + if (record.isNull(idx)) { + nullFields.set(idx); + } + } + return nullFields; + } + @Override public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { http://git-wip-us.apache.org/repos/asf/beam/blob/4871edbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java index 8d0aa42..6cbd11b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.values; import java.io.Serializable; import java.math.BigDecimal; import java.util.ArrayList; -import java.util.BitSet; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; @@ -34,17 +33,13 @@ import org.apache.beam.sdk.annotations.Experimental; @Experimental public class BeamRecord implements Serializable { private List<Object> dataValues; - //null values are indexed here, to handle properly in Coder. - private BitSet nullFields; private BeamRecordType dataType; public BeamRecord(BeamRecordType dataType) { this.dataType = dataType; - this.nullFields = new BitSet(dataType.size()); this.dataValues = new ArrayList<>(); for (int idx = 0; idx < dataType.size(); ++idx) { dataValues.add(null); - nullFields.set(idx); } } @@ -60,12 +55,6 @@ public class BeamRecord implements Serializable { } public void addField(int index, Object fieldValue) { - if (fieldValue == null) { - return; - } else { - nullFields.clear(index); - } - dataType.validateValueType(index, fieldValue); dataValues.set(index, fieldValue); } @@ -182,21 +171,16 @@ public class BeamRecord implements Serializable { return dataType; } - public BitSet getNullFields() { - return nullFields; - } - /** * is the specified field NULL? */ public boolean isNull(int idx) { - return nullFields.get(idx); + return null == getFieldValue(idx); } @Override public String toString() { - return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType=" - + dataType + "]"; + return "BeamSqlRow [dataValues=" + dataValues + ", dataType=" + dataType + "]"; } /** @@ -227,7 +211,6 @@ public class BeamRecord implements Serializable { } @Override public int hashCode() { - return 31 * (31 * getDataType().hashCode() + getDataValues().hashCode()) - + getNullFields().hashCode(); + return 31 * getDataType().hashCode() + getDataValues().hashCode(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4871edbc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java index 0cbea5c..e98ead1 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java @@ -229,8 +229,4 @@ public class BeamSortRel extends Sort implements BeamRelNode { return 0; } } - - public static <T extends Comparable> int compare(T a, T b) { - return a.compareTo(b); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/4871edbc/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java index b295049..fe82834 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java @@ -133,6 +133,10 @@ public class BeamSqlRecordType extends BeamRecordType { @Override public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException { + if (null == fieldValue) {// no need to do type check for NULL value + return; + } + int fieldType = fieldsType.get(index); Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType); if (javaClazz == null) {