This is an automated email from the ASF dual-hosted git repository.
richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 44da1a098f DataTable deserialization improvements (#8499)
44da1a098f is described below
commit 44da1a098fd47ffb533da6ca1386c64fb86e50a9
Author: Richard Startin <[email protected]>
AuthorDate: Tue Apr 12 10:47:52 2022 +0100
DataTable deserialization improvements (#8499)
* avoid allocating MetaDataKey[]
* avoid excessive allocation in DataTable deserialization
* remove unused overload of DataSchema.fromBytes, ensure new overload is
tested, add comments about expectations of ByteBuffer arguments
---
.../org/apache/pinot/common/utils/DataSchema.java | 27 +++----
.../org/apache/pinot/common/utils/DataTable.java | 11 +--
.../apache/pinot/common/utils/DataSchemaTest.java | 3 +-
.../pinot/core/common/datatable/BaseDataTable.java | 17 ++---
.../core/common/datatable/DataTableImplV2.java | 25 ++-----
.../core/common/datatable/DataTableImplV3.java | 82 +++++++++-------------
.../core/common/datatable/DataTableUtils.java | 16 +++++
7 files changed, 81 insertions(+), 100 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 2f461bdbbe..915a8110ef 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -22,12 +22,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.EnumSet;
@@ -176,35 +175,29 @@ public class DataSchema {
return byteArrayOutputStream.toByteArray();
}
- public static DataSchema fromBytes(byte[] buffer)
+ /**
+ * This method use relative operations on the ByteBuffer and expects the
buffer's position to be set correctly.
+ */
+ public static DataSchema fromBytes(ByteBuffer buffer)
throws IOException {
- ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(buffer);
- DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream);
-
// Read the number of columns.
- int numColumns = dataInputStream.readInt();
+ int numColumns = buffer.getInt();
String[] columnNames = new String[numColumns];
ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
-
// Read the column names.
- int readLength;
for (int i = 0; i < numColumns; i++) {
- int length = dataInputStream.readInt();
+ int length = buffer.getInt();
byte[] bytes = new byte[length];
- readLength = dataInputStream.read(bytes);
- assert readLength == length;
+ buffer.get(bytes);
columnNames[i] = new String(bytes, UTF_8);
}
-
// Read the column types.
for (int i = 0; i < numColumns; i++) {
- int length = dataInputStream.readInt();
+ int length = buffer.getInt();
byte[] bytes = new byte[length];
- readLength = dataInputStream.read(bytes);
- assert readLength == length;
+ buffer.get(bytes);
columnDataTypes[i] = ColumnDataType.valueOf(new String(bytes, UTF_8));
}
-
return new DataSchema(columnNames, columnDataTypes);
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 256d217299..f73ab72c2f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -19,6 +19,7 @@
package org.apache.pinot.common.utils;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
@@ -109,6 +110,7 @@ public interface DataTable {
SYSTEM_ACTIVITIES_CPU_TIME_NS("systemActivitiesCpuTimeNs",
MetadataValueType.LONG),
RESPONSE_SER_CPU_TIME_NS("responseSerializationCpuTimeNs",
MetadataValueType.LONG);
+ private static final MetadataKey[] VALUES;
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new
HashMap<>();
private final String _name;
private final MetadataValueType _valueType;
@@ -121,10 +123,7 @@ public interface DataTable {
// getByOrdinal returns an enum key for a given ordinal or null if the key
does not exist.
@Nullable
public static MetadataKey getByOrdinal(int ordinal) {
- if (ordinal >= MetadataKey.values().length) {
- return null;
- }
- return MetadataKey.values()[ordinal];
+ return VALUES[Math.min(ordinal, VALUES.length - 1)];
}
// getByName returns an enum key for a given name or null if the key does
not exist.
@@ -143,11 +142,13 @@ public interface DataTable {
}
static {
- for (MetadataKey key : MetadataKey.values()) {
+ MetadataKey[] values = values();
+ for (MetadataKey key : values) {
if (NAME_TO_ENUM_KEY_MAP.put(key.getName(), key) != null) {
throw new IllegalArgumentException("Duplicate name defined in the
MetadataKey definition: " + key.getName());
}
}
+ VALUES = Arrays.copyOf(values, values.length + 1);
}
}
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
index 04355b8fc0..ec18998161 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.utils;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.pinot.spi.data.FieldSpec;
import org.testng.Assert;
@@ -65,7 +66,7 @@ public class DataSchemaTest {
public void testSerDe()
throws Exception {
DataSchema dataSchema = new DataSchema(COLUMN_NAMES, COLUMN_DATA_TYPES);
- DataSchema dataSchemaAfterSerDe =
DataSchema.fromBytes(dataSchema.toBytes());
+ DataSchema dataSchemaAfterSerDe =
DataSchema.fromBytes(ByteBuffer.wrap(dataSchema.toBytes()));
Assert.assertEquals(dataSchema, dataSchemaAfterSerDe);
Assert.assertEquals(dataSchema.hashCode(),
dataSchemaAfterSerDe.hashCode());
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
index 941ffe382a..24d113bd7d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
@@ -18,9 +18,7 @@
*/
package org.apache.pinot.core.common.datatable;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -115,27 +113,24 @@ public abstract class BaseDataTable implements DataTable {
/**
* Helper method to deserialize dictionary map.
*/
- protected Map<String, Map<Integer, String>> deserializeDictionaryMap(byte[]
bytes)
+ protected Map<String, Map<Integer, String>>
deserializeDictionaryMap(ByteBuffer buffer)
throws IOException {
- try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(bytes);
- DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream)) {
- int numDictionaries = dataInputStream.readInt();
+ int numDictionaries = buffer.getInt();
Map<String, Map<Integer, String>> dictionaryMap = new
HashMap<>(numDictionaries);
for (int i = 0; i < numDictionaries; i++) {
- String column = DataTableUtils.decodeString(dataInputStream);
- int dictionarySize = dataInputStream.readInt();
+ String column = DataTableUtils.decodeString(buffer);
+ int dictionarySize = buffer.getInt();
Map<Integer, String> dictionary = new HashMap<>(dictionarySize);
for (int j = 0; j < dictionarySize; j++) {
- int key = dataInputStream.readInt();
- String value = DataTableUtils.decodeString(dataInputStream);
+ int key = buffer.getInt();
+ String value = DataTableUtils.decodeString(buffer);
dictionary.put(key, value);
}
dictionaryMap.put(column, dictionary);
}
return dictionaryMap;
- }
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
index 924ff24bd1..1340b45e19 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
@@ -18,9 +18,7 @@
*/
package org.apache.pinot.core.common.datatable;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -79,26 +77,20 @@ public class DataTableImplV2 extends BaseDataTable {
// Read dictionary.
if (dictionaryMapLength != 0) {
- byte[] dictionaryMapBytes = new byte[dictionaryMapLength];
byteBuffer.position(dictionaryMapStart);
- byteBuffer.get(dictionaryMapBytes);
- _dictionaryMap = deserializeDictionaryMap(dictionaryMapBytes);
+ _dictionaryMap = deserializeDictionaryMap(byteBuffer);
} else {
_dictionaryMap = null;
}
// Read metadata.
- byte[] metadataBytes = new byte[metadataLength];
byteBuffer.position(metadataStart);
- byteBuffer.get(metadataBytes);
- _metadata = deserializeMetadata(metadataBytes);
+ _metadata = deserializeMetadata(byteBuffer);
// Read data schema.
if (dataSchemaLength != 0) {
- byte[] schemaBytes = new byte[dataSchemaLength];
byteBuffer.position(dataSchemaStart);
- byteBuffer.get(schemaBytes);
- _dataSchema = DataSchema.fromBytes(schemaBytes);
+ _dataSchema = DataSchema.fromBytes(byteBuffer);
_columnOffsets = new int[_dataSchema.size()];
_rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema,
_columnOffsets);
} else {
@@ -130,21 +122,18 @@ public class DataTableImplV2 extends BaseDataTable {
}
}
- private Map<String, String> deserializeMetadata(byte[] bytes)
+ private Map<String, String> deserializeMetadata(ByteBuffer buffer)
throws IOException {
- try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(bytes);
- DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream)) {
- int numEntries = dataInputStream.readInt();
+ int numEntries = buffer.getInt();
Map<String, String> metadata = new HashMap<>(numEntries);
for (int i = 0; i < numEntries; i++) {
- String key = DataTableUtils.decodeString(dataInputStream);
- String value = DataTableUtils.decodeString(dataInputStream);
+ String key = DataTableUtils.decodeString(buffer);
+ String value = DataTableUtils.decodeString(buffer);
metadata.put(key, value);
}
return metadata;
- }
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
index 97512357c7..7838c362fe 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
@@ -21,9 +21,7 @@ package org.apache.pinot.core.common.datatable;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -111,30 +109,24 @@ public class DataTableImplV3 extends BaseDataTable {
// Read exceptions.
if (exceptionsLength != 0) {
- byte[] exceptionsBytes = new byte[exceptionsLength];
byteBuffer.position(exceptionsStart);
- byteBuffer.get(exceptionsBytes);
- _errCodeToExceptionMap = deserializeExceptions(exceptionsBytes);
+ _errCodeToExceptionMap = deserializeExceptions(byteBuffer);
} else {
_errCodeToExceptionMap = new HashMap<>();
}
// Read dictionary.
if (dictionaryMapLength != 0) {
- byte[] dictionaryMapBytes = new byte[dictionaryMapLength];
byteBuffer.position(dictionaryMapStart);
- byteBuffer.get(dictionaryMapBytes);
- _dictionaryMap = deserializeDictionaryMap(dictionaryMapBytes);
+ _dictionaryMap = deserializeDictionaryMap(byteBuffer);
} else {
_dictionaryMap = null;
}
// Read data schema.
if (dataSchemaLength != 0) {
- byte[] schemaBytes = new byte[dataSchemaLength];
byteBuffer.position(dataSchemaStart);
- byteBuffer.get(schemaBytes);
- _dataSchema = DataSchema.fromBytes(schemaBytes);
+ _dataSchema = DataSchema.fromBytes(byteBuffer);
_columnOffsets = new int[_dataSchema.size()];
_rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema,
_columnOffsets);
} else {
@@ -168,9 +160,7 @@ public class DataTableImplV3 extends BaseDataTable {
// Read metadata.
int metadataLength = byteBuffer.getInt();
if (metadataLength != 0) {
- byte[] metadataBytes = new byte[metadataLength];
- byteBuffer.get(metadataBytes);
- _metadata = deserializeMetadata(metadataBytes);
+ _metadata = deserializeMetadata(byteBuffer);
}
}
@@ -349,33 +339,32 @@ public class DataTableImplV3 extends BaseDataTable {
* DataTable from each server and aggregates the values).
* This is to make V3 implementation keep the consumers of Map<String,
String> getMetadata() API in the code happy
* by internally converting it.
+ *
+ * This method use relative operations on the ByteBuffer and expects the
buffer's position to be set correctly.
*/
- private Map<String, String> deserializeMetadata(byte[] bytes)
+ private Map<String, String> deserializeMetadata(ByteBuffer buffer)
throws IOException {
- try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(bytes);
- DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream)) {
- int numEntries = dataInputStream.readInt();
- Map<String, String> metadata = new HashMap<>();
- for (int i = 0; i < numEntries; i++) {
- int keyId = dataInputStream.readInt();
- MetadataKey key = MetadataKey.getByOrdinal(keyId);
- // Ignore unknown keys.
- if (key == null) {
- continue;
- }
- if (key.getValueType() == MetadataValueType.INT) {
- String value =
String.valueOf(DataTableUtils.decodeInt(dataInputStream));
- metadata.put(key.getName(), value);
- } else if (key.getValueType() == MetadataValueType.LONG) {
- String value =
String.valueOf(DataTableUtils.decodeLong(dataInputStream));
- metadata.put(key.getName(), value);
- } else {
- String value =
String.valueOf(DataTableUtils.decodeString(dataInputStream));
- metadata.put(key.getName(), value);
- }
+ int numEntries = buffer.getInt();
+ Map<String, String> metadata = new HashMap<>();
+ for (int i = 0; i < numEntries; i++) {
+ int keyId = buffer.getInt();
+ MetadataKey key = MetadataKey.getByOrdinal(keyId);
+ // Ignore unknown keys.
+ if (key == null) {
+ continue;
+ }
+ if (key.getValueType() == MetadataValueType.INT) {
+ String value = "" + buffer.getInt();
+ metadata.put(key.getName(), value);
+ } else if (key.getValueType() == MetadataValueType.LONG) {
+ String value = "" + buffer.getLong();
+ metadata.put(key.getName(), value);
+ } else {
+ String value = DataTableUtils.decodeString(buffer);
+ metadata.put(key.getName(), value);
}
- return metadata;
}
+ return metadata;
}
private byte[] serializeExceptions()
@@ -397,18 +386,15 @@ public class DataTableImplV3 extends BaseDataTable {
return byteArrayOutputStream.toByteArray();
}
- private Map<Integer, String> deserializeExceptions(byte[] bytes)
+ private Map<Integer, String> deserializeExceptions(ByteBuffer buffer)
throws IOException {
- try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(bytes);
- DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream)) {
- int numExceptions = dataInputStream.readInt();
- Map<Integer, String> exceptions = new HashMap<>(numExceptions);
- for (int i = 0; i < numExceptions; i++) {
- int errCode = dataInputStream.readInt();
- String errMessage = DataTableUtils.decodeString(dataInputStream);
- exceptions.put(errCode, errMessage);
- }
- return exceptions;
+ int numExceptions = buffer.getInt();
+ Map<Integer, String> exceptions = new HashMap<>(numExceptions);
+ for (int i = 0; i < numExceptions; i++) {
+ int errCode = buffer.getInt();
+ String errMessage = DataTableUtils.decodeString(buffer);
+ exceptions.put(errCode, errMessage);
}
+ return exceptions;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
index 632e6187ef..606a8d8fb4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
@@ -22,6 +22,7 @@ import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import java.io.DataInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -257,6 +258,21 @@ public class DataTableUtils {
}
}
+ /**
+ * Helper method to decode string.
+ */
+ public static String decodeString(ByteBuffer buffer)
+ throws IOException {
+ int length = buffer.getInt();
+ if (length == 0) {
+ return "";
+ } else {
+ byte[] bytes = new byte[length];
+ buffer.get(bytes);
+ return new String(bytes, UTF_8);
+ }
+ }
+
/**
* Helper method to decode int.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]