This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d778df2c0f Optimize ser/de to avoid using output stream (#9278)
d778df2c0f is described below
commit d778df2c0fe6674387bceb30ab20343ac381ee2a
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Aug 25 10:32:28 2022 -0700
Optimize ser/de to avoid using output stream (#9278)
---
.../apache/pinot/core/common/ObjectSerDeUtils.java | 168 ++++++++++-----------
.../apache/pinot/perf/BenchmarkObjectSerDe.java | 99 ++++++++++++
2 files changed, 183 insertions(+), 84 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 0cd922eb6c..95066b1460 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.common;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.tdunning.math.stats.MergingDigest;
import com.tdunning.math.stats.TDigest;
@@ -45,14 +46,11 @@ import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import it.unimi.dsi.fastutil.objects.ObjectSet;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -540,39 +538,39 @@ public class ObjectSerDeUtils {
return new byte[Integer.BYTES];
}
- // No need to close these 2 streams
- ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
-
- try {
- // Write the size of the map
- dataOutputStream.writeInt(size);
-
- // Write the serialized key-value pairs
- Iterator<Map.Entry<Object, Object>> iterator =
map.entrySet().iterator();
- // First write the key type and value type
- Map.Entry<Object, Object> firstEntry = iterator.next();
- Object firstKey = firstEntry.getKey();
- Object firstValue = firstEntry.getValue();
- int keyTypeValue = ObjectType.getObjectType(firstKey).getValue();
- int valueTypeValue = ObjectType.getObjectType(firstValue).getValue();
- dataOutputStream.writeInt(keyTypeValue);
- dataOutputStream.writeInt(valueTypeValue);
- // Then write each key-value pair
- for (Map.Entry<Object, Object> entry : map.entrySet()) {
- byte[] keyBytes = ObjectSerDeUtils.serialize(entry.getKey(),
keyTypeValue);
- dataOutputStream.writeInt(keyBytes.length);
- dataOutputStream.write(keyBytes);
-
- byte[] valueBytes = ObjectSerDeUtils.serialize(entry.getValue(),
valueTypeValue);
- dataOutputStream.writeInt(valueBytes.length);
- dataOutputStream.write(valueBytes);
- }
- } catch (IOException e) {
- throw new RuntimeException("Caught exception while serializing Map",
e);
+ // Besides the value bytes, we store: size, key type, value type, length
for each key, length for each value
+ long bufferSize = (3 + 2 * (long) size) * Integer.BYTES;
+ byte[][] keyBytesArray = new byte[size][];
+ byte[][] valueBytesArray = new byte[size][];
+ Map.Entry<Object, Object> firstEntry = map.entrySet().iterator().next();
+ int keyTypeValue =
ObjectType.getObjectType(firstEntry.getKey()).getValue();
+ int valueTypeValue =
ObjectType.getObjectType(firstEntry.getValue()).getValue();
+ ObjectSerDe keySerDe = SER_DES[keyTypeValue];
+ ObjectSerDe valueSerDe = SER_DES[valueTypeValue];
+ int index = 0;
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ byte[] keyBytes = keySerDe.serialize(entry.getKey());
+ bufferSize += keyBytes.length;
+ keyBytesArray[index] = keyBytes;
+ byte[] valueBytes = valueSerDe.serialize(entry.getValue());
+ bufferSize += valueBytes.length;
+ valueBytesArray[index++] = valueBytes;
}
-
- return byteArrayOutputStream.toByteArray();
+ Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size
exceeds 2GB");
+ byte[] bytes = new byte[(int) bufferSize];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(size);
+ byteBuffer.putInt(keyTypeValue);
+ byteBuffer.putInt(valueTypeValue);
+ for (int i = 0; i < index; i++) {
+ byte[] keyBytes = keyBytesArray[i];
+ byteBuffer.putInt(keyBytes.length);
+ byteBuffer.put(keyBytes);
+ byte[] valueBytes = valueBytesArray[i];
+ byteBuffer.putInt(valueBytes.length);
+ byteBuffer.put(valueBytes);
+ }
+ return bytes;
}
@Override
@@ -736,20 +734,24 @@ public class ObjectSerDeUtils {
@Override
public byte[] serialize(Set<String> stringSet) {
int size = stringSet.size();
- // NOTE: No need to close the ByteArrayOutputStream.
- ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- try {
- dataOutputStream.writeInt(size);
- for (String value : stringSet) {
- byte[] bytes = value.getBytes(UTF_8);
- dataOutputStream.writeInt(bytes.length);
- dataOutputStream.write(bytes);
- }
- } catch (IOException e) {
- throw new RuntimeException("Caught exception while serializing
Set<String>", e);
+ // Besides the value bytes, we store: size, length for each value
+ long bufferSize = (1 + (long) size) * Integer.BYTES;
+ byte[][] valueBytesArray = new byte[size][];
+ int index = 0;
+ for (String value : stringSet) {
+ byte[] valueBytes = value.getBytes(UTF_8);
+ bufferSize += valueBytes.length;
+ valueBytesArray[index++] = valueBytes;
}
- return byteArrayOutputStream.toByteArray();
+ Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size
exceeds 2GB");
+ byte[] bytes = new byte[(int) bufferSize];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(size);
+ for (byte[] valueBytes : valueBytesArray) {
+ byteBuffer.putInt(valueBytes.length);
+ byteBuffer.put(valueBytes);
+ }
+ return bytes;
}
@Override
@@ -776,20 +778,21 @@ public class ObjectSerDeUtils {
@Override
public byte[] serialize(Set<ByteArray> bytesSet) {
int size = bytesSet.size();
- // NOTE: No need to close the ByteArrayOutputStream.
- ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- try {
- dataOutputStream.writeInt(size);
- for (ByteArray value : bytesSet) {
- byte[] bytes = value.getBytes();
- dataOutputStream.writeInt(bytes.length);
- dataOutputStream.write(bytes);
- }
- } catch (IOException e) {
- throw new RuntimeException("Caught exception while serializing
Set<ByteArray>", e);
+ // Besides the value bytes, we store: size, length for each value
+ long bufferSize = (1 + (long) size) * Integer.BYTES;
+ for (ByteArray value : bytesSet) {
+ bufferSize += value.length();
+ }
+ Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size
exceeds 2GB");
+ byte[] bytes = new byte[(int) bufferSize];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(size);
+ for (ByteArray value : bytesSet) {
+ byte[] valueBytes = value.getBytes();
+ byteBuffer.putInt(valueBytes.length);
+ byteBuffer.put(valueBytes);
}
- return byteArrayOutputStream.toByteArray();
+ return bytes;
}
@Override
@@ -941,30 +944,27 @@ public class ObjectSerDeUtils {
return new byte[Integer.BYTES];
}
- // No need to close these 2 streams (close() is no-op)
- ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
-
- try {
- // Write the size of the list
- dataOutputStream.writeInt(size);
-
- // Write the value type
- Object firstValue = list.get(0);
- int valueType = ObjectType.getObjectType(firstValue).getValue();
- dataOutputStream.writeInt(valueType);
-
- // Write the serialized values
- for (Object value : list) {
- byte[] bytes = ObjectSerDeUtils.serialize(value, valueType);
- dataOutputStream.writeInt(bytes.length);
- dataOutputStream.write(bytes);
- }
- } catch (IOException e) {
- throw new RuntimeException("Caught exception while serializing List",
e);
+ // Besides the value bytes, we store: size, value type, length for each
value
+ long bufferSize = (2 + (long) size) * Integer.BYTES;
+ byte[][] valueBytesArray = new byte[size][];
+ int valueType = ObjectType.getObjectType(list.get(0)).getValue();
+ ObjectSerDe serDe = SER_DES[valueType];
+ int index = 0;
+ for (Object value : list) {
+ byte[] valueBytes = serDe.serialize(value);
+ bufferSize += valueBytes.length;
+ valueBytesArray[index++] = valueBytes;
}
-
- return byteArrayOutputStream.toByteArray();
+ Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size
exceeds 2GB");
+ byte[] bytes = new byte[(int) bufferSize];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(size);
+ byteBuffer.putInt(valueType);
+ for (byte[] valueBytes : valueBytesArray) {
+ byteBuffer.putInt(valueBytes.length);
+ byteBuffer.put(valueBytes);
+ }
+ return bytes;
}
@Override
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkObjectSerDe.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkObjectSerDe.java
new file mode 100644
index 0000000000..657ea3ef00
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkObjectSerDe.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 10)
+@Measurement(iterations = 5, time = 10)
+@State(Scope.Benchmark)
+public class BenchmarkObjectSerDe {
+ private static final int NUM_VALUES = 5_000_000;
+
+ List<String> _stringList = new ArrayList<>(NUM_VALUES);
+ Set<String> _stringSet = new ObjectOpenHashSet<>(NUM_VALUES);
+ Map<String, String> _stringToStringMap = new
HashMap<>(HashUtil.getHashMapCapacity(NUM_VALUES));
+ Set<ByteArray> _bytesSet = new ObjectOpenHashSet<>(NUM_VALUES);
+
+ @Setup
+ public void setUp()
+ throws IOException {
+ for (int i = 0; i < NUM_VALUES; i++) {
+ String stringValue = RandomStringUtils.randomAlphanumeric(10, 201);
+ _stringList.add(stringValue);
+ _stringSet.add(stringValue);
+ _stringToStringMap.put(stringValue, stringValue);
+ _bytesSet.add(new ByteArray(stringValue.getBytes(UTF_8)));
+ }
+ }
+
+ @Benchmark
+ public int stringList() {
+ return ObjectSerDeUtils.serialize(_stringList).length;
+ }
+
+ @Benchmark
+ public int stringSet() {
+ return ObjectSerDeUtils.serialize(_stringSet).length;
+ }
+
+ @Benchmark
+ public int stringToStringMap() {
+ return ObjectSerDeUtils.serialize(_stringToStringMap).length;
+ }
+
+ @Benchmark
+ public int bytesSet() {
+ return ObjectSerDeUtils.serialize(_bytesSet).length;
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ new Runner(new
OptionsBuilder().include(BenchmarkObjectSerDe.class.getSimpleName()).build()).run();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]