This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch datasketches-4.2.0 in repository https://gitbox.apache.org/repos/asf/druid.git
commit fa5daf5217390bea163bebe1c9cfe80a2ed6ac38 Author: AlexanderSaydakov <[email protected]> AuthorDate: Mon Oct 23 17:22:32 2023 -0700 use datasketches-java 4.2.0 --- .../kll/KllDoublesSketchAggregatorFactory.java | 3 +- .../kll/KllFloatsSketchAggregatorFactory.java | 3 +- .../KllDoublesSketchComplexMetricSerdeTest.java | 6 +- .../kll/KllFloatsSketchComplexMetricSerdeTest.java | 4 +- .../QuantilesSketchKeyCollectorFactory.java | 55 +++++++++++++-- .../distribution/ArrayOfStringTuplesSerDe.java | 80 +++++++++++++++++++--- .../distribution/ArrayOfStringsNullSafeSerde.java | 3 +- licenses.yaml | 2 +- pom.xml | 2 +- 9 files changed, 133 insertions(+), 25 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java index 815227adf55..8c6dacbdf39 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java @@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation.datasketches.kll; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.datasketches.kll.KllSketch.SketchType; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.AggregatorUtil; @@ -124,7 +125,7 @@ public class KllDoublesSketchAggregatorFactory extends KllSketchAggregatorFactor @Override int getMaxSerializedSizeBytes(final int k, final long n) { - return KllDoublesSketch.getMaxSerializedSizeBytes(k, n, true); + return KllDoublesSketch.getMaxSerializedSizeBytes(k, n, SketchType.DOUBLES_SKETCH, true); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java index 9cc61524615..5620921621f 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java @@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation.datasketches.kll; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.datasketches.kll.KllFloatsSketch; +import org.apache.datasketches.kll.KllSketch.SketchType; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.AggregatorUtil; @@ -124,7 +125,7 @@ public class KllFloatsSketchAggregatorFactory extends KllSketchAggregatorFactory @Override int getMaxSerializedSizeBytes(final int k, final long n) { - return KllFloatsSketch.getMaxSerializedSizeBytes(k, n, true); + return KllFloatsSketch.getMaxSerializedSizeBytes(k, n, SketchType.FLOATS_SKETCH, true); } @Override diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java index d0a26307990..730fb54c541 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchComplexMetricSerdeTest.java @@ -114,7 +114,7 @@ public class KllDoublesSketchComplexMetricSerdeTest objectStrategy.fromByteBufferSafe(buf, bytes.length).toByteArray(); // corrupted sketch should fail with a regular java buffer exception, not all subsets actually fail with the same - // index out of bounds exceptions, but at least this many do + // sketches exceptions, but at least this many do for (int subset = 3; subset < 24; subset++) { final byte[] garbage2 = new byte[subset]; for (int i = 0; i < garbage2.length; i++) { @@ -123,7 +123,7 @@ public class KllDoublesSketchComplexMetricSerdeTest final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN); Assert.assertThrows( - IndexOutOfBoundsException.class, + Exception.class, () -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray() ); } @@ -132,7 +132,7 @@ public class KllDoublesSketchComplexMetricSerdeTest final byte[] garbage = new byte[]{0x01, 0x02}; final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN); Assert.assertThrows( - IndexOutOfBoundsException.class, + Exception.class, () -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray() ); } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java index 56a39778990..ee505fe65b8 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchComplexMetricSerdeTest.java @@ -123,7 +123,7 @@ public class KllFloatsSketchComplexMetricSerdeTest final ByteBuffer buf2 = ByteBuffer.wrap(garbage2).order(ByteOrder.LITTLE_ENDIAN); Assert.assertThrows( - IndexOutOfBoundsException.class, + Exception.class, () -> objectStrategy.fromByteBufferSafe(buf2, garbage2.length).toByteArray() ); } @@ -132,7 +132,7 @@ public class KllFloatsSketchComplexMetricSerdeTest final byte[] garbage = new byte[]{0x01, 0x02}; final ByteBuffer buf3 = ByteBuffer.wrap(garbage).order(ByteOrder.LITTLE_ENDIAN); Assert.assertThrows( - IndexOutOfBoundsException.class, + Exception.class, () -> objectStrategy.fromByteBufferSafe(buf3, garbage.length).toByteArray() ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java index 3192813cfe1..0c858983875 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.google.common.annotations.VisibleForTesting; import org.apache.datasketches.common.ArrayOfItemsSerDe; +import org.apache.datasketches.common.ByteArrayUtil; import org.apache.datasketches.memory.Memory; import org.apache.datasketches.memory.WritableMemory; import org.apache.datasketches.quantiles.ItemsSketch; @@ -126,22 +127,66 @@ public class QuantilesSketchKeyCollectorFactory } @Override - public byte[][] deserializeFromMemory(final Memory mem, final int numItems) + public byte[][] deserializeFromMemory(final Memory mem, long offsetBytes, final int numItems) { final byte[][] keys = new byte[numItems][]; - long keyPosition = (long) Integer.BYTES * numItems; + final long start = offsetBytes; + offsetBytes += (long) Integer.BYTES * numItems; for (int i = 0; i < numItems; i++) { - final int keyLength = mem.getInt((long) Integer.BYTES * i); + final int keyLength = mem.getInt(start + Integer.BYTES * i); final byte[] keyBytes = new byte[keyLength]; - mem.getByteArray(keyPosition, keyBytes, 0, keyLength); + mem.getByteArray(offsetBytes, keyBytes, 0, keyLength); keys[i] = keyBytes; - keyPosition += keyLength; + offsetBytes += keyLength; } return keys; } + + @Override + public byte[] serializeToByteArray(final byte[] item) + { + final byte[] bytes = new byte[Integer.BYTES + item.length]; + ByteArrayUtil.putIntLE(bytes, 0, item.length); + ByteArrayUtil.copyBytes(item, 0, bytes, Integer.BYTES, item.length); + return bytes; + } + + @Override + public byte[][] deserializeFromMemory(final Memory mem, final int numItems) + { + return deserializeFromMemory(mem, 0, numItems); + } + + @Override + public int sizeOf(final byte[] item) + { + return Integer.BYTES + item.length; + } + + @Override + public int sizeOf(final Memory mem, long offsetBytes, final int numItems) + { + int length = Integer.BYTES * numItems; + for (int i = 0; i < numItems; i++) { + length = mem.getInt(offsetBytes + Integer.BYTES * i); + } + return length; + } + + @Override + public String toString(final byte[] item) + { + return item.toString(); + } + + @Override + public Class<?> getClassOfT() + { + return byte[].class; + } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java index e3f76b5b92f..178e191652e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java @@ -21,8 +21,9 @@ package org.apache.druid.indexing.common.task.batch.parallel.distribution; import org.apache.datasketches.common.ArrayOfItemsSerDe; import org.apache.datasketches.common.ArrayOfStringsSerDe; +import org.apache.datasketches.common.ByteArrayUtil; +import org.apache.datasketches.common.Util; import org.apache.datasketches.memory.Memory; -import org.apache.datasketches.memory.WritableMemory; import org.apache.datasketches.memory.internal.UnsafeUtil; import org.apache.druid.data.input.StringTuple; @@ -36,7 +37,7 @@ public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple> private static final ArrayOfStringsNullSafeSerde STRINGS_SERDE = new ArrayOfStringsNullSafeSerde(); @Override - public byte[] serializeToByteArray(StringTuple[] items) + public byte[] serializeToByteArray(final StringTuple[] items) { int length = 0; final byte[][] itemsBytes = new byte[items.length][]; @@ -49,29 +50,27 @@ public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple> } final byte[] bytes = new byte[length]; - final WritableMemory mem = WritableMemory.writableWrap(bytes); - long offsetBytes = 0; + int offsetBytes = 0; for (int i = 0; i < items.length; i++) { // Add the number of items in the StringTuple - mem.putInt(offsetBytes, items[i].size()); + ByteArrayUtil.putIntLE(bytes, offsetBytes, items[i].size()); offsetBytes += Integer.BYTES; // Add the size of byte content for the StringTuple - mem.putInt(offsetBytes, itemsBytes[i].length); + ByteArrayUtil.putIntLE(bytes, offsetBytes, itemsBytes[i].length); offsetBytes += Integer.BYTES; // Add the byte contents of the StringTuple - mem.putByteArray(offsetBytes, itemsBytes[i], 0, itemsBytes[i].length); + ByteArrayUtil.copyBytes(itemsBytes[i], 0, bytes, offsetBytes, itemsBytes[i].length); offsetBytes += itemsBytes[i].length; } return bytes; } @Override - public StringTuple[] deserializeFromMemory(Memory mem, int numItems) + public StringTuple[] deserializeFromMemory(final Memory mem, long offsetBytes, final int numItems) { final StringTuple[] array = new StringTuple[numItems]; - long offsetBytes = 0; for (int i = 0; i < numItems; i++) { // Read the number of items in the StringTuple UnsafeUtil.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity()); @@ -96,4 +95,67 @@ public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple> } return array; } + + @Override + public byte[] serializeToByteArray(final StringTuple item) + { + final byte[] itemBytes = STRINGS_SERDE.serializeToByteArray(item.toArray()); + final byte[] bytes = new byte[Integer.BYTES * 2 + itemBytes.length]; + int offsetBytes = 0; + ByteArrayUtil.putIntLE(bytes, offsetBytes, item.size()); + offsetBytes += Integer.BYTES; + ByteArrayUtil.putIntLE(bytes, offsetBytes, itemBytes.length); + offsetBytes += Integer.BYTES; + ByteArrayUtil.copyBytes(itemBytes, 0, bytes, offsetBytes, itemBytes.length); + return bytes; + } + + @Override + public StringTuple[] deserializeFromMemory(final Memory mem, final int numItems) + { + return deserializeFromMemory(mem, 0, numItems); + } + + @Override + public int sizeOf(final StringTuple item) + { + int length = Integer.BYTES; + for (final String s : item.toArray()) { + length += STRINGS_SERDE.sizeOf(s); + } + return length; + } + + @Override + public int sizeOf(final Memory mem, long offsetBytes, final int numItems) + { + final long start = offsetBytes; + for (int i = 0; i < numItems; i++) { + // Skip the number of items in the StringTuple + Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity()); + offsetBytes += Integer.BYTES; + + // Read the size of byte content + Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity()); + final int byteContentSize = mem.getInt(offsetBytes); + offsetBytes += Integer.BYTES; + + // Skip the byte content + Util.checkBounds(offsetBytes, byteContentSize, mem.getCapacity()); + offsetBytes += byteContentSize; + } + return (int) (offsetBytes - start); + } + + @Override + public String toString(final StringTuple item) + { + return item.toString(); + } + + @Override + public Class<?> getClassOfT() + { + return StringTuple.class; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java index b5a8393b172..406121c4086 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringsNullSafeSerde.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.common.task.batch.parallel.distribution; -import org.apache.datasketches.common.ArrayOfItemsSerDe; import org.apache.datasketches.common.ArrayOfStringsSerDe; import org.apache.datasketches.common.Util; import org.apache.datasketches.memory.Memory; @@ -35,7 +34,7 @@ import java.nio.charset.StandardCharsets; * The implementation is the same as {@link ArrayOfStringsSerDe}, except this * class handles null String values as well. */ -public class ArrayOfStringsNullSafeSerde extends ArrayOfItemsSerDe<String> +public class ArrayOfStringsNullSafeSerde extends ArrayOfStringsSerDe { private static final int NULL_STRING_LENGTH = -1; diff --git a/licenses.yaml b/licenses.yaml index a207fb1b0ac..eb2d63dd245 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3477,7 +3477,7 @@ name: DataSketches license_category: binary module: java-core license_name: Apache License version 2.0 -version: 4.1.0 +version: 4.2.0 libraries: - org.apache.datasketches: datasketches-java diff --git a/pom.xml b/pom.xml index 26f35353436..e9431770dbf 100644 --- a/pom.xml +++ b/pom.xml @@ -86,7 +86,7 @@ default_config.fmpp --> <calcite.version>1.35.0</calcite.version> - <datasketches.version>4.1.0</datasketches.version> + <datasketches.version>4.2.0</datasketches.version> <datasketches.memory.version>2.2.0</datasketches.memory.version> <derby.version>10.14.2.0</derby.version> <dropwizard.metrics.version>4.2.19</dropwizard.metrics.version> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
