use BitSet for nullFields
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/129ae969 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/129ae969 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/129ae969 Branch: refs/heads/DSL_SQL Commit: 129ae9696af6a2f8d83ee962ca2ba8a7d6e3fd40 Parents: 52933a6 Author: mingmxu <[email protected]> Authored: Thu Aug 3 00:43:46 2017 -0700 Committer: mingmxu <[email protected]> Committed: Thu Aug 3 00:43:46 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/coders/BeamRecordCoder.java | 18 +++++++++++---- .../org/apache/beam/sdk/values/BeamRecord.java | 24 ++++++++++---------- .../extensions/sql/schema/BeamSqlRowCoder.java | 14 ++++++------ 3 files changed, 32 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/129ae969/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java index ad27f4e..27f92ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.coders; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.BitSet; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.values.BeamRecord; @@ -30,23 +31,30 @@ import org.apache.beam.sdk.values.BeamRecordTypeProvider; */ @Experimental public class BeamRecordCoder extends CustomCoder<BeamRecord> { - private static final ListCoder<Integer> nullListCoder = ListCoder.of(BigEndianIntegerCoder.of()); + private static final BitSetCoder nullListCoder = BitSetCoder.of(); private static final InstantCoder instantCoder = InstantCoder.of(); private BeamRecordTypeProvider recordType; private List<Coder> coderArray; - public BeamRecordCoder(BeamRecordTypeProvider recordType, List<Coder> coderArray) { + private BeamRecordCoder(BeamRecordTypeProvider recordType, List<Coder> coderArray) { this.recordType = recordType; this.coderArray = coderArray; } + public static BeamRecordCoder of(BeamRecordTypeProvider recordType, List<Coder> coderArray){ + if (recordType.size() != coderArray.size()) { + throw new IllegalArgumentException("Coder size doesn't match with field size"); + } + return new BeamRecordCoder(recordType, coderArray); + } + @Override public void encode(BeamRecord value, OutputStream outStream) throws CoderException, IOException { nullListCoder.encode(value.getNullFields(), outStream); for (int idx = 0; idx < value.size(); ++idx) { - if (value.getNullFields().contains(idx)) { + if (value.getNullFields().get(idx)) { continue; } @@ -59,12 +67,12 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> { @Override public BeamRecord decode(InputStream inStream) throws CoderException, IOException { - List<Integer> nullFields = nullListCoder.decode(inStream); + BitSet nullFields = nullListCoder.decode(inStream); BeamRecord record = new BeamRecord(recordType); record.setNullFields(nullFields); for (int idx = 0; idx < recordType.size(); ++idx) { - if (nullFields.contains(idx)) { + if (nullFields.get(idx)) { continue; } http://git-wip-us.apache.org/repos/asf/beam/blob/129ae969/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java index d1c1c17..476233e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.values; import java.io.Serializable; import java.math.BigDecimal; import java.util.ArrayList; +import java.util.BitSet; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; @@ -36,9 +37,9 @@ import org.joda.time.Instant; */ @Experimental public class BeamRecord implements Serializable { - //null values are indexed here, to handle properly in Coder. - private List<Integer> nullFields = new ArrayList<>(); private List<Object> dataValues; + //null values are indexed here, to handle properly in Coder. + private BitSet nullFields; private BeamRecordTypeProvider dataType; private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); @@ -46,10 +47,11 @@ public class BeamRecord implements Serializable { public BeamRecord(BeamRecordTypeProvider dataType) { this.dataType = dataType; + this.nullFields = new BitSet(dataType.size()); this.dataValues = new ArrayList<>(); for (int idx = 0; idx < dataType.size(); ++idx) { dataValues.add(null); - nullFields.add(idx); + nullFields.set(idx); } } @@ -79,9 +81,7 @@ public class BeamRecord implements Serializable { if (fieldValue == null) { return; } else { - if (nullFields.contains(index)) { - nullFields.remove(nullFields.indexOf(index)); - } + nullFields.clear(index); } dataType.validateValueType(index, fieldValue); @@ -137,7 +137,7 @@ public class BeamRecord implements Serializable { } public Object getFieldValue(int fieldIdx) { - if (nullFields.contains(fieldIdx)) { + if (nullFields.get(fieldIdx)) { return null; } @@ -208,19 +208,19 @@ public class BeamRecord implements Serializable { this.dataType = dataType; } - public void setNullFields(List<Integer> nullFields) { - this.nullFields = nullFields; + public BitSet getNullFields() { + return nullFields; } - public List<Integer> getNullFields() { - return nullFields; + public void setNullFields(BitSet nullFields) { + this.nullFields = nullFields; } /** * is the specified field NULL? */ public boolean isNull(int idx) { - return nullFields.contains(idx); + return nullFields.get(idx); } public Instant getWindowStart() { http://git-wip-us.apache.org/repos/asf/beam/blob/129ae969/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java index 3d760c4..c7656af 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java @@ -21,19 +21,19 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.sql.Types; +import java.util.BitSet; import java.util.Date; import java.util.GregorianCalendar; -import java.util.List; import org.apache.beam.sdk.coders.BigDecimalCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.BitSetCoder; import org.apache.beam.sdk.coders.ByteCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.InstantCoder; -import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; /** @@ -42,7 +42,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { private BeamSqlRowType sqlRecordType; - private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of()); + private static final BitSetCoder nullListCoder = BitSetCoder.of(); private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of(); @@ -59,9 +59,9 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { @Override public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException { - listCoder.encode(value.getNullFields(), outStream); + nullListCoder.encode(value.getNullFields(), outStream); for (int idx = 0; idx < value.size(); ++idx) { - if (value.getNullFields().contains(idx)) { + if (value.getNullFields().get(idx)) { continue; } @@ -114,12 +114,12 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { @Override public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException { - List<Integer> nullFields = listCoder.decode(inStream); + BitSet nullFields = nullListCoder.decode(inStream); BeamSqlRow record = new BeamSqlRow(sqlRecordType); record.setNullFields(nullFields); for (int idx = 0; idx < sqlRecordType.size(); ++idx) { - if (nullFields.contains(idx)) { + if (nullFields.get(idx)) { continue; }
