This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c503ba9 Write null byte when indexing numeric dimensions with Hadoop
(#7020)
c503ba9 is described below
commit c503ba97799e3531753c5a26ef9c6f06c0e7e13e
Author: Ferris Tseng <[email protected]>
AuthorDate: Mon Mar 11 21:02:03 2019 -0400
Write null byte when indexing numeric dimensions with Hadoop (#7020)
* write null byte in hadoop indexing for numeric dimensions
* Add test case to check output serializing null numeric dimensions
* Remove extra line
* Add @Nullable annotations
---
.../org/apache/druid/indexer/InputRowSerde.java | 59 ++++++++++++++--------
.../apache/druid/indexer/InputRowSerdeTest.java | 55 ++++++++++++++++++++
2 files changed, 93 insertions(+), 21 deletions(-)
diff --git
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
index b9c48f4..c0a42f2 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
/**
*/
@@ -63,6 +64,34 @@ public class InputRowSerde
private static final IndexSerdeTypeHelper FLOAT_HELPER = new
FloatIndexSerdeTypeHelper();
private static final IndexSerdeTypeHelper DOUBLE_HELPER = new
DoubleIndexSerdeTypeHelper();
+ private static <T extends Number> void writeNullableNumeric(
+ T ret,
+ final ByteArrayDataOutput out,
+ final Supplier<T> getDefault,
+ final Consumer<T> write)
+ {
+ if (ret == null) {
+ ret = getDefault.get();
+ }
+
+ // Write the null byte only if the default numeric value is still null.
+ if (ret == null) {
+ out.writeByte(NullHandling.IS_NULL_BYTE);
+ return;
+ }
+
+ if (NullHandling.sqlCompatible()) {
+ out.writeByte(NullHandling.IS_NOT_NULL_BYTE);
+ }
+
+ write.accept(ret);
+ }
+
+ private static boolean isNullByteSet(final ByteArrayDataInput in)
+ {
+ return NullHandling.sqlCompatible() && in.readByte() ==
NullHandling.IS_NULL_BYTE;
+ }
+
public interface IndexSerdeTypeHelper<T>
{
ValueType getType();
@@ -175,12 +204,7 @@ public class InputRowSerde
exceptionToThrow = pe;
}
- if (ret == null) {
- // remove null -> zero conversion when
https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
- // we'll also need to change the serialized encoding so that it can
represent numeric nulls
- ret = DimensionHandlerUtils.ZERO_LONG;
- }
- out.writeLong(ret);
+ writeNullableNumeric(ret, out, NullHandling::defaultLongValue,
out::writeLong);
if (exceptionToThrow != null) {
throw exceptionToThrow;
@@ -188,9 +212,10 @@ public class InputRowSerde
}
@Override
+ @Nullable
public Long deserialize(ByteArrayDataInput in)
{
- return in.readLong();
+ return isNullByteSet(in) ? null : in.readLong();
}
}
@@ -214,12 +239,7 @@ public class InputRowSerde
exceptionToThrow = pe;
}
- if (ret == null) {
- // remove null -> zero conversion when
https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
- // we'll also need to change the serialized encoding so that it can
represent numeric nulls
- ret = DimensionHandlerUtils.ZERO_FLOAT;
- }
- out.writeFloat(ret);
+ writeNullableNumeric(ret, out, NullHandling::defaultFloatValue,
out::writeFloat);
if (exceptionToThrow != null) {
throw exceptionToThrow;
@@ -227,9 +247,10 @@ public class InputRowSerde
}
@Override
+ @Nullable
public Float deserialize(ByteArrayDataInput in)
{
- return in.readFloat();
+ return isNullByteSet(in) ? null : in.readFloat();
}
}
@@ -253,12 +274,7 @@ public class InputRowSerde
exceptionToThrow = pe;
}
- if (ret == null) {
- // remove null -> zero conversion when
https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
- // we'll also need to change the serialized encoding so that it can
represent numeric nulls
- ret = DimensionHandlerUtils.ZERO_DOUBLE;
- }
- out.writeDouble(ret);
+ writeNullableNumeric(ret, out, NullHandling::defaultDoubleValue,
out::writeDouble);
if (exceptionToThrow != null) {
throw exceptionToThrow;
@@ -266,9 +282,10 @@ public class InputRowSerde
}
@Override
+ @Nullable
public Double deserialize(ByteArrayDataInput in)
{
- return in.readDouble();
+ return isNullByteSet(in) ? null : in.readDouble();
}
}
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java
index ce9b95c..3d00c15 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java
@@ -254,4 +254,59 @@ public class InputRowSerdeTest
result.getParseExceptionMessages()
);
}
+
+ @Test
+ public void testDimensionNullOrDefaultForNumerics()
+ {
+ HashMap<String, Object> eventWithNulls = new HashMap<>();
+ eventWithNulls.put("d1", null);
+ eventWithNulls.put("d2", Arrays.asList("d2v1", "d2v2"));
+ eventWithNulls.put("d3", null);
+ eventWithNulls.put("d4", null);
+ eventWithNulls.put("d5", null);
+
+ InputRow in = new MapBasedInputRow(
+ timestamp,
+ dims,
+ eventWithNulls
+ );
+
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("d1"),
+ new StringDimensionSchema("d2"),
+ new LongDimensionSchema("d3"),
+ new FloatDimensionSchema("d4"),
+ new DoubleDimensionSchema("d5")
+ ),
+ null,
+ null
+ );
+
+ byte[] result =
InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, new
AggregatorFactory[0]).getSerializedRow();
+
+ if (NullHandling.replaceWithDefault()) {
+ long expected = 0;
+ expected += 9; // timestamp bytes + dims length
+ expected += 18; // dim_non_existing writes: 1 16 1 bytes
+ expected += 4; // d1: writes 1 2 1 bytes
+ expected += 14; // d2: writes 1 2 1 1 4 1 4 bytes
+ expected += 11; // d3: writes 1 2 8 bytes
+ expected += 7; // d4: writes 1 2 4 bytes
+ expected += 11; // d5: writes 1 2 8 bytes
+ expected += 1; // writes aggregator length
+
+ Assert.assertEquals(expected, result.length);
+ Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0},
Arrays.copyOfRange(result, 48, 56));
+ Assert.assertArrayEquals(new byte[] {0, 0, 0, 0},
Arrays.copyOfRange(result, 59, 63));
+ Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0},
Arrays.copyOfRange(result, 66, 74));
+ } else {
+ long expected = 9 + 18 + 4 + 14 + 4 + 4 + 4 + 1;
+
+ Assert.assertEquals(expected, result.length);
+ Assert.assertEquals(result[48], NullHandling.IS_NULL_BYTE);
+ Assert.assertEquals(result[52], NullHandling.IS_NULL_BYTE);
+ Assert.assertEquals(result[56], NullHandling.IS_NULL_BYTE);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]