This is an automated email from the ASF dual-hosted git repository. cwylie pushed a commit to branch 0.14.1-incubating in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit 51461e9dd72a8b33dafa2782c6dd507d59373394 Author: Ferris Tseng <[email protected]> AuthorDate: Mon Mar 11 21:02:03 2019 -0400 Write null byte when indexing numeric dimensions with Hadoop (#7020) * write null byte in hadoop indexing for numeric dimensions * Add test case to check output serializing null numeric dimensions * Remove extra line * Add @Nullable annotations --- .../org/apache/druid/indexer/InputRowSerde.java | 59 ++++++++++++++-------- .../apache/druid/indexer/InputRowSerdeTest.java | 55 ++++++++++++++++++++ 2 files changed, 93 insertions(+), 21 deletions(-) diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java index b9c48f4..c0a42f2 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java @@ -51,6 +51,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; /** */ @@ -63,6 +64,34 @@ public class InputRowSerde private static final IndexSerdeTypeHelper FLOAT_HELPER = new FloatIndexSerdeTypeHelper(); private static final IndexSerdeTypeHelper DOUBLE_HELPER = new DoubleIndexSerdeTypeHelper(); + private static <T extends Number> void writeNullableNumeric( + T ret, + final ByteArrayDataOutput out, + final Supplier<T> getDefault, + final Consumer<T> write) + { + if (ret == null) { + ret = getDefault.get(); + } + + // Write the null byte only if the default numeric value is still null. + if (ret == null) { + out.writeByte(NullHandling.IS_NULL_BYTE); + return; + } + + if (NullHandling.sqlCompatible()) { + out.writeByte(NullHandling.IS_NOT_NULL_BYTE); + } + + write.accept(ret); + } + + private static boolean isNullByteSet(final ByteArrayDataInput in) + { + return NullHandling.sqlCompatible() && in.readByte() == NullHandling.IS_NULL_BYTE; + } + public interface IndexSerdeTypeHelper<T> { ValueType getType(); @@ -175,12 +204,7 @@ public class InputRowSerde exceptionToThrow = pe; } - if (ret == null) { - // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged - // we'll also need to change the serialized encoding so that it can represent numeric nulls - ret = DimensionHandlerUtils.ZERO_LONG; - } - out.writeLong(ret); + writeNullableNumeric(ret, out, NullHandling::defaultLongValue, out::writeLong); if (exceptionToThrow != null) { throw exceptionToThrow; @@ -188,9 +212,10 @@ public class InputRowSerde } @Override + @Nullable public Long deserialize(ByteArrayDataInput in) { - return in.readLong(); + return isNullByteSet(in) ? null : in.readLong(); } } @@ -214,12 +239,7 @@ public class InputRowSerde exceptionToThrow = pe; } - if (ret == null) { - // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged - // we'll also need to change the serialized encoding so that it can represent numeric nulls - ret = DimensionHandlerUtils.ZERO_FLOAT; - } - out.writeFloat(ret); + writeNullableNumeric(ret, out, NullHandling::defaultFloatValue, out::writeFloat); if (exceptionToThrow != null) { throw exceptionToThrow; @@ -227,9 +247,10 @@ public class InputRowSerde } @Override + @Nullable public Float deserialize(ByteArrayDataInput in) { - return in.readFloat(); + return isNullByteSet(in) ? null : in.readFloat(); } } @@ -253,12 +274,7 @@ public class InputRowSerde exceptionToThrow = pe; } - if (ret == null) { - // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged - // we'll also need to change the serialized encoding so that it can represent numeric nulls - ret = DimensionHandlerUtils.ZERO_DOUBLE; - } - out.writeDouble(ret); + writeNullableNumeric(ret, out, NullHandling::defaultDoubleValue, out::writeDouble); if (exceptionToThrow != null) { throw exceptionToThrow; @@ -266,9 +282,10 @@ public class InputRowSerde } @Override + @Nullable public Double deserialize(ByteArrayDataInput in) { - return in.readDouble(); + return isNullByteSet(in) ? null : in.readDouble(); } } diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java index ce9b95c..3d00c15 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java @@ -254,4 +254,59 @@ public class InputRowSerdeTest result.getParseExceptionMessages() ); } + + @Test + public void testDimensionNullOrDefaultForNumerics() + { + HashMap<String, Object> eventWithNulls = new HashMap<>(); + eventWithNulls.put("d1", null); + eventWithNulls.put("d2", Arrays.asList("d2v1", "d2v2")); + eventWithNulls.put("d3", null); + eventWithNulls.put("d4", null); + eventWithNulls.put("d5", null); + + InputRow in = new MapBasedInputRow( + timestamp, + dims, + eventWithNulls + ); + + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2"), + new LongDimensionSchema("d3"), + new FloatDimensionSchema("d4"), + new DoubleDimensionSchema("d5") + ), + null, + null + ); + + byte[] result = InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, new AggregatorFactory[0]).getSerializedRow(); + + if (NullHandling.replaceWithDefault()) { + long expected = 0; + expected += 9; // timestamp bytes + dims length + expected += 18; // dim_non_existing writes: 1 16 1 bytes + expected += 4; // d1: writes 1 2 1 bytes + expected += 14; // d2: writes 1 2 1 1 4 1 4 bytes + expected += 11; // d3: writes 1 2 8 bytes + expected += 7; // d4: writes 1 2 4 bytes + expected += 11; // d5: writes 1 2 8 bytes + expected += 1; // writes aggregator length + + Assert.assertEquals(expected, result.length); + Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0}, Arrays.copyOfRange(result, 48, 56)); + Assert.assertArrayEquals(new byte[] {0, 0, 0, 0}, Arrays.copyOfRange(result, 59, 63)); + Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0}, Arrays.copyOfRange(result, 66, 74)); + } else { + long expected = 9 + 18 + 4 + 14 + 4 + 4 + 4 + 1; + + Assert.assertEquals(expected, result.length); + Assert.assertEquals(result[48], NullHandling.IS_NULL_BYTE); + Assert.assertEquals(result[52], NullHandling.IS_NULL_BYTE); + Assert.assertEquals(result[56], NullHandling.IS_NULL_BYTE); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
