This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new dacc6d0690 Cleanup old DataTable v2 and v3 (#13590)
dacc6d0690 is described below
commit dacc6d06907c44e83721454f1090e5f00c824f15
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Jul 11 23:07:49 2024 -0700
Cleanup old DataTable v2 and v3 (#13590)
---
.../pinot/common/datablock/BaseDataBlock.java | 4 +-
.../pinot/common/datatable/DataTableFactory.java | 15 +-
.../pinot/common/datatable/DataTableImplV2.java | 272 --------------
.../pinot/common/datatable/DataTableImplV3.java | 403 --------------------
.../pinot/common/datatable/DataTableImplV4.java | 39 +-
.../pinot/common/datatable/DataTableUtils.java | 18 +-
.../common/datatable/DataTableBuilderFactory.java | 39 +-
.../common/datatable/DataTableBuilderV2V3.java | 121 ------
.../core/query/request/ServerQueryRequest.java | 10 +-
.../core/common/datatable/DataTableSerDeTest.java | 405 +--------------------
.../apache/pinot/queries/AllNullQueriesTest.java | 2 -
.../pinot/queries/BigDecimalQueriesTest.java | 2 -
.../queries/BooleanNullEnabledQueriesTest.java | 2 -
.../pinot/queries/NullEnabledQueriesTest.java | 2 -
.../tests/NullHandlingIntegrationTest.java | 2 -
.../tests/OfflineClusterIntegrationTest.java | 3 -
.../query/runtime/queries/QueryRunnerTest.java | 2 -
.../runtime/queries/ResourceBasedQueriesTest.java | 2 -
18 files changed, 59 insertions(+), 1284 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
index 333ac673b6..9b643b088d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
@@ -28,7 +28,7 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.apache.pinot.common.CustomObject;
-import org.apache.pinot.common.datatable.DataTableImplV3;
+import org.apache.pinot.common.datatable.DataTableImplV4;
import org.apache.pinot.common.datatable.DataTableUtils;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
@@ -43,7 +43,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
/**
- * Base data block mostly replicating implementation of {@link
DataTableImplV3}.
+ * Base data block mostly replicating implementation of {@link
DataTableImplV4}.
*
* +-----------------------------------------------+
* | 13 integers of header: |
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java
index 55374d7cca..ec7762c605 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.datatable;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -26,23 +27,13 @@ public class DataTableFactory {
private DataTableFactory() {
}
- public static final int VERSION_2 = 2;
- public static final int VERSION_3 = 3;
public static final int VERSION_4 = 4;
public static DataTable getDataTable(ByteBuffer byteBuffer)
throws IOException {
int version = byteBuffer.getInt();
- switch (version) {
- case VERSION_2:
- return new DataTableImplV2(byteBuffer);
- case VERSION_3:
- return new DataTableImplV3(byteBuffer);
- case VERSION_4:
- return new DataTableImplV4(byteBuffer);
- default:
- throw new IllegalStateException("Unsupported data table version: " +
version);
- }
+ Preconditions.checkState(version == VERSION_4, "Unsupported data table
version: %s", version);
+ return new DataTableImplV4(byteBuffer);
}
public static DataTable getDataTable(byte[] bytes)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java
deleted file mode 100644
index 261e1edc46..0000000000
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.datatable;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.HashUtil;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-
-public class DataTableImplV2 extends BaseDataTable {
- // VERSION
- // NUM_ROWS
- // NUM_COLUMNS
- // DICTIONARY_MAP (START|SIZE)
- // METADATA (START|SIZE)
- // DATA_SCHEMA (START|SIZE)
- // FIXED_SIZE_DATA (START|SIZE)
- // VARIABLE_SIZE_DATA (START|SIZE)
- private static final int HEADER_SIZE = Integer.BYTES * 13;
-
- /**
- * Construct data table with results. (Server side)
- */
- public DataTableImplV2(int numRows, DataSchema dataSchema, Map<String,
Map<Integer, String>> dictionaryMap,
- byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
- super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes,
variableSizeDataBytes);
- }
-
- /**
- * Construct empty data table. (Server side)
- */
- public DataTableImplV2() {
- }
-
- /**
- * Construct data table from byte array. (broker side)
- */
- public DataTableImplV2(ByteBuffer byteBuffer)
- throws IOException {
- // Read header.
- _numRows = byteBuffer.getInt();
- _numColumns = byteBuffer.getInt();
- int dictionaryMapStart = byteBuffer.getInt();
- int dictionaryMapLength = byteBuffer.getInt();
- int metadataStart = byteBuffer.getInt();
- int metadataLength = byteBuffer.getInt();
- int dataSchemaStart = byteBuffer.getInt();
- int dataSchemaLength = byteBuffer.getInt();
- int fixedSizeDataStart = byteBuffer.getInt();
- int fixedSizeDataLength = byteBuffer.getInt();
- int variableSizeDataStart = byteBuffer.getInt();
- int variableSizeDataLength = byteBuffer.getInt();
-
- // Read dictionary.
- if (dictionaryMapLength != 0) {
- byteBuffer.position(dictionaryMapStart);
- _dictionaryMap = deserializeDictionaryMap(byteBuffer);
- } else {
- _dictionaryMap = null;
- }
-
- // Read metadata.
- byteBuffer.position(metadataStart);
- _metadata = deserializeMetadata(byteBuffer);
-
- // Read data schema.
- if (dataSchemaLength != 0) {
- byteBuffer.position(dataSchemaStart);
- _dataSchema = DataSchema.fromBytes(byteBuffer);
- _columnOffsets = new int[_dataSchema.size()];
- _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema,
_columnOffsets, getVersion());
- } else {
- _dataSchema = null;
- _columnOffsets = null;
- _rowSizeInBytes = 0;
- }
-
- // Read fixed size data.
- if (fixedSizeDataLength != 0) {
- _fixedSizeDataBytes = new byte[fixedSizeDataLength];
- byteBuffer.position(fixedSizeDataStart);
- byteBuffer.get(_fixedSizeDataBytes);
- _fixedSizeData = ByteBuffer.wrap(_fixedSizeDataBytes);
- } else {
- _fixedSizeDataBytes = null;
- _fixedSizeData = null;
- }
-
- // Read variable size data.
- if (variableSizeDataLength != 0) {
- _variableSizeDataBytes = new byte[variableSizeDataLength];
- byteBuffer.position(variableSizeDataStart);
- byteBuffer.get(_variableSizeDataBytes);
- _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes);
- } else {
- _variableSizeDataBytes = null;
- _variableSizeData = null;
- }
- }
-
- @Override
- public int getVersion() {
- return DataTableFactory.VERSION_2;
- }
-
- private Map<String, String> deserializeMetadata(ByteBuffer buffer)
- throws IOException {
- int numEntries = buffer.getInt();
- Map<String, String> metadata = new
HashMap<>(HashUtil.getHashMapCapacity(numEntries));
-
- for (int i = 0; i < numEntries; i++) {
- String key = DataTableUtils.decodeString(buffer);
- String value = DataTableUtils.decodeString(buffer);
- metadata.put(key, value);
- }
-
- return metadata;
- }
-
- @Override
- public void addException(ProcessingException processingException) {
- _metadata.put(EXCEPTION_METADATA_KEY + processingException.getErrorCode(),
processingException.getMessage());
- }
-
- @Override
- public void addException(int errCode, String errMsg) {
- _metadata.put(EXCEPTION_METADATA_KEY + errCode, errMsg);
- }
-
- // getExceptions return a map of errorCode->errMessage of the datatable.
- @Override
- public Map<Integer, String> getExceptions() {
- Map<Integer, String> exceptions = new HashMap<>();
- for (String key : _metadata.keySet()) {
- if (key.startsWith(EXCEPTION_METADATA_KEY)) {
- // In V2, all exceptions are added into metadata, using
"Exception"+errCode as key,
- // Integer.parseInt(key.substring(9)) can extract the error code from
the key.
- exceptions.put(Integer.parseInt(key.substring(9)), _metadata.get(key));
- }
- }
- return exceptions;
- }
-
- @Override
- public byte[] toBytes()
- throws IOException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- dataOutputStream.writeInt(DataTableFactory.VERSION_2);
- dataOutputStream.writeInt(_numRows);
- dataOutputStream.writeInt(_numColumns);
- int dataOffset = HEADER_SIZE;
-
- // Write dictionary.
- dataOutputStream.writeInt(dataOffset);
- byte[] dictionaryMapBytes = null;
- if (_dictionaryMap != null) {
- dictionaryMapBytes = serializeDictionaryMap();
- dataOutputStream.writeInt(dictionaryMapBytes.length);
- dataOffset += dictionaryMapBytes.length;
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write metadata.
- dataOutputStream.writeInt(dataOffset);
- byte[] metadataBytes = serializeMetadata();
- dataOutputStream.writeInt(metadataBytes.length);
- dataOffset += metadataBytes.length;
-
- // Write data schema.
- dataOutputStream.writeInt(dataOffset);
- byte[] dataSchemaBytes = null;
- if (_dataSchema != null) {
- dataSchemaBytes = _dataSchema.toBytes();
- dataOutputStream.writeInt(dataSchemaBytes.length);
- dataOffset += dataSchemaBytes.length;
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write fixed size data.
- dataOutputStream.writeInt(dataOffset);
- if (_fixedSizeDataBytes != null) {
- dataOutputStream.writeInt(_fixedSizeDataBytes.length);
- dataOffset += _fixedSizeDataBytes.length;
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write variable size data.
- dataOutputStream.writeInt(dataOffset);
- if (_variableSizeDataBytes != null) {
- dataOutputStream.writeInt(_variableSizeDataBytes.length);
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write actual data.
- if (dictionaryMapBytes != null) {
- dataOutputStream.write(dictionaryMapBytes);
- }
- dataOutputStream.write(metadataBytes);
- if (dataSchemaBytes != null) {
- dataOutputStream.write(dataSchemaBytes);
- }
- if (_fixedSizeDataBytes != null) {
- dataOutputStream.write(_fixedSizeDataBytes);
- }
- if (_variableSizeDataBytes != null) {
- dataOutputStream.write(_variableSizeDataBytes);
- }
-
- return byteArrayOutputStream.toByteArray();
- }
-
- @Override
- public DataTableImplV2 toMetadataOnlyDataTable() {
- DataTableImplV2 metadataOnlyDataTable = new DataTableImplV2();
- metadataOnlyDataTable._metadata.putAll(_metadata);
- return metadataOnlyDataTable;
- }
-
- @Override
- public DataTableImplV2 toDataOnlyDataTable() {
- return new DataTableImplV2(_numRows, _dataSchema, _dictionaryMap,
_fixedSizeDataBytes, _variableSizeDataBytes);
- }
-
- private byte[] serializeMetadata()
- throws IOException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
-
- dataOutputStream.writeInt(_metadata.size());
- for (Entry<String, String> entry : _metadata.entrySet()) {
- byte[] keyBytes = entry.getKey().getBytes(UTF_8);
- dataOutputStream.writeInt(keyBytes.length);
- dataOutputStream.write(keyBytes);
-
- byte[] valueBytes = entry.getValue().getBytes(UTF_8);
- dataOutputStream.writeInt(valueBytes.length);
- dataOutputStream.write(valueBytes);
- }
-
- return byteArrayOutputStream.toByteArray();
- }
-}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
deleted file mode 100644
index af4b7eff46..0000000000
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.common.datatable;
-
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.HashUtil;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-
-/**
- * Datatable V3 implementation.
- * The layout of serialized V3 datatable looks like:
- * +-----------------------------------------------+
- * | 13 integers of header: |
- * | VERSION |
- * | NUM_ROWS |
- * | NUM_COLUMNS |
- * | EXCEPTIONS SECTION START OFFSET |
- * | EXCEPTIONS SECTION LENGTH |
- * | DICTIONARY_MAP SECTION START OFFSET |
- * | DICTIONARY_MAP SECTION LENGTH |
- * | DATA_SCHEMA SECTION START OFFSET |
- * | DATA_SCHEMA SECTION LENGTH |
- * | FIXED_SIZE_DATA SECTION START OFFSET |
- * | FIXED_SIZE_DATA SECTION LENGTH |
- * | VARIABLE_SIZE_DATA SECTION START OFFSET |
- * | VARIABLE_SIZE_DATA SECTION LENGTH |
- * +-----------------------------------------------+
- * | EXCEPTIONS SECTION |
- * +-----------------------------------------------+
- * | DICTIONARY_MAP SECTION |
- * +-----------------------------------------------+
- * | DATA_SCHEMA SECTION |
- * +-----------------------------------------------+
- * | FIXED_SIZE_DATA SECTION |
- * +-----------------------------------------------+
- * | VARIABLE_SIZE_DATA SECTION |
- * +-----------------------------------------------+
- * | METADATA LENGTH |
- * | METADATA SECTION |
- * +-----------------------------------------------+
- */
-public class DataTableImplV3 extends BaseDataTable {
- private static final int HEADER_SIZE = Integer.BYTES * 13;
- // _errCodeToExceptionMap stores exceptions as a map of
errorCode->errorMessage
- private final Map<Integer, String> _errCodeToExceptionMap;
-
- /**
- * Construct data table with results. (Server side)
- */
- public DataTableImplV3(int numRows, DataSchema dataSchema, Map<String,
Map<Integer, String>> dictionaryMap,
- byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
- super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes,
variableSizeDataBytes);
- _errCodeToExceptionMap = new HashMap<>();
- }
-
- /**
- * Construct empty data table. (Server side)
- */
- public DataTableImplV3() {
- _errCodeToExceptionMap = new HashMap<>();
- }
-
- /**
- * Construct data table from byte array. (broker side)
- */
- public DataTableImplV3(ByteBuffer byteBuffer)
- throws IOException {
- // Read header.
- _numRows = byteBuffer.getInt();
- _numColumns = byteBuffer.getInt();
- int exceptionsStart = byteBuffer.getInt();
- int exceptionsLength = byteBuffer.getInt();
- int dictionaryMapStart = byteBuffer.getInt();
- int dictionaryMapLength = byteBuffer.getInt();
- int dataSchemaStart = byteBuffer.getInt();
- int dataSchemaLength = byteBuffer.getInt();
- int fixedSizeDataStart = byteBuffer.getInt();
- int fixedSizeDataLength = byteBuffer.getInt();
- int variableSizeDataStart = byteBuffer.getInt();
- int variableSizeDataLength = byteBuffer.getInt();
-
- // Read exceptions.
- if (exceptionsLength != 0) {
- byteBuffer.position(exceptionsStart);
- _errCodeToExceptionMap = deserializeExceptions(byteBuffer);
- } else {
- _errCodeToExceptionMap = new HashMap<>();
- }
-
- // Read dictionary.
- if (dictionaryMapLength != 0) {
- byteBuffer.position(dictionaryMapStart);
- _dictionaryMap = deserializeDictionaryMap(byteBuffer);
- } else {
- _dictionaryMap = null;
- }
-
- // Read data schema.
- if (dataSchemaLength != 0) {
- byteBuffer.position(dataSchemaStart);
- _dataSchema = DataSchema.fromBytes(byteBuffer);
- _columnOffsets = new int[_dataSchema.size()];
- _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema,
_columnOffsets, getVersion());
- } else {
- _dataSchema = null;
- _columnOffsets = null;
- _rowSizeInBytes = 0;
- }
-
- // Read fixed size data.
- if (fixedSizeDataLength != 0) {
- _fixedSizeDataBytes = new byte[fixedSizeDataLength];
- byteBuffer.position(fixedSizeDataStart);
- byteBuffer.get(_fixedSizeDataBytes);
- _fixedSizeData = ByteBuffer.wrap(_fixedSizeDataBytes);
- } else {
- _fixedSizeDataBytes = null;
- _fixedSizeData = null;
- }
-
- // Read variable size data.
- _variableSizeDataBytes = new byte[variableSizeDataLength];
- if (variableSizeDataLength != 0) {
- byteBuffer.position(variableSizeDataStart);
- byteBuffer.get(_variableSizeDataBytes);
- }
- _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes);
-
- // Read metadata.
- int metadataLength = byteBuffer.getInt();
- if (metadataLength != 0) {
- _metadata = deserializeMetadata(byteBuffer);
- }
- }
-
- @Override
- public int getVersion() {
- return DataTableFactory.VERSION_3;
- }
-
- @Override
- public void addException(ProcessingException processingException) {
- _errCodeToExceptionMap.put(processingException.getErrorCode(),
processingException.getMessage());
- }
-
- @Override
- public void addException(int errCode, String errMsg) {
- _errCodeToExceptionMap.put(errCode, errMsg);
- }
-
- @Override
- public Map<Integer, String> getExceptions() {
- return _errCodeToExceptionMap;
- }
-
- @Override
- public byte[] toBytes()
- throws IOException {
- ThreadResourceUsageProvider threadResourceUsageProvider = new
ThreadResourceUsageProvider();
-
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- writeLeadingSections(dataOutputStream);
-
- // Add table serialization time metadata if thread timer is enabled.
- if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
- long responseSerializationCpuTimeNs =
threadResourceUsageProvider.getThreadTimeNs();
- getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(),
String.valueOf(responseSerializationCpuTimeNs));
- }
-
- // Write metadata: length followed by actual metadata bytes.
- // NOTE: We ignore metadata serialization time in
"responseSerializationCpuTimeNs" as it's negligible while
- // considering it will bring a lot code complexity.
- byte[] metadataBytes = serializeMetadata();
- dataOutputStream.writeInt(metadataBytes.length);
- dataOutputStream.write(metadataBytes);
-
- return byteArrayOutputStream.toByteArray();
- }
-
- @Override
- public DataTableImplV3 toMetadataOnlyDataTable() {
- DataTableImplV3 metadataOnlyDataTable = new DataTableImplV3();
- metadataOnlyDataTable._metadata.putAll(_metadata);
-
metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap);
- return metadataOnlyDataTable;
- }
-
- @Override
- public DataTableImplV3 toDataOnlyDataTable() {
- return new DataTableImplV3(_numRows, _dataSchema, _dictionaryMap,
_fixedSizeDataBytes, _variableSizeDataBytes);
- }
-
- private void writeLeadingSections(DataOutputStream dataOutputStream)
- throws IOException {
- dataOutputStream.writeInt(DataTableFactory.VERSION_3);
- dataOutputStream.writeInt(_numRows);
- dataOutputStream.writeInt(_numColumns);
- int dataOffset = HEADER_SIZE;
-
- // Write exceptions section offset(START|SIZE).
- dataOutputStream.writeInt(dataOffset);
- byte[] exceptionsBytes;
- exceptionsBytes = serializeExceptions();
- dataOutputStream.writeInt(exceptionsBytes.length);
- dataOffset += exceptionsBytes.length;
-
- // Write dictionary map section offset(START|SIZE).
- dataOutputStream.writeInt(dataOffset);
- byte[] dictionaryMapBytes = null;
- if (_dictionaryMap != null) {
- dictionaryMapBytes = serializeDictionaryMap();
- dataOutputStream.writeInt(dictionaryMapBytes.length);
- dataOffset += dictionaryMapBytes.length;
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write data schema section offset(START|SIZE).
- dataOutputStream.writeInt(dataOffset);
- byte[] dataSchemaBytes = null;
- if (_dataSchema != null) {
- dataSchemaBytes = _dataSchema.toBytes();
- dataOutputStream.writeInt(dataSchemaBytes.length);
- dataOffset += dataSchemaBytes.length;
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write fixed size data section offset(START|SIZE).
- dataOutputStream.writeInt(dataOffset);
- if (_fixedSizeDataBytes != null) {
- dataOutputStream.writeInt(_fixedSizeDataBytes.length);
- dataOffset += _fixedSizeDataBytes.length;
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write variable size data section offset(START|SIZE).
- dataOutputStream.writeInt(dataOffset);
- if (_variableSizeDataBytes != null) {
- dataOutputStream.writeInt(_variableSizeDataBytes.length);
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write actual data.
- // Write exceptions bytes.
- dataOutputStream.write(exceptionsBytes);
- // Write dictionary map bytes.
- if (dictionaryMapBytes != null) {
- dataOutputStream.write(dictionaryMapBytes);
- }
- // Write data schema bytes.
- if (dataSchemaBytes != null) {
- dataOutputStream.write(dataSchemaBytes);
- }
- // Write fixed size data bytes.
- if (_fixedSizeDataBytes != null) {
- dataOutputStream.write(_fixedSizeDataBytes);
- }
- // Write variable size data bytes.
- if (_variableSizeDataBytes != null) {
- dataOutputStream.write(_variableSizeDataBytes);
- }
- }
-
- /**
- * Serialize metadata section to bytes.
- * Format of the bytes looks like:
- * [numEntries, bytesOfKV2, bytesOfKV2, bytesOfKV3]
- * For each KV pair:
- * - if the value type is String, encode it as: [enumKeyOrdinal,
valueLength, Utf8EncodedValue].
- * - if the value type is int, encode it as: [enumKeyOrdinal,
bigEndianRepresentationOfIntValue]
- * - if the value type is long, encode it as: [enumKeyOrdinal,
bigEndianRepresentationOfLongValue]
- *
- * Unlike V2, where numeric metadata values (int and long) in V3 are encoded
in UTF-8 in the wire format,
- * in V3 big endian representation is used.
- */
- private byte[] serializeMetadata()
- throws IOException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
-
- dataOutputStream.writeInt(_metadata.size());
-
- for (Map.Entry<String, String> entry : _metadata.entrySet()) {
- MetadataKey key = MetadataKey.getByName(entry.getKey());
- // Ignore unknown keys.
- if (key == null) {
- continue;
- }
- String value = entry.getValue();
- dataOutputStream.writeInt(key.getId());
- if (key.getValueType() == MetadataValueType.INT) {
- dataOutputStream.write(Ints.toByteArray(Integer.parseInt(value)));
- } else if (key.getValueType() == MetadataValueType.LONG) {
- dataOutputStream.write(Longs.toByteArray(Long.parseLong(value)));
- } else {
- byte[] valueBytes = value.getBytes(UTF_8);
- dataOutputStream.writeInt(valueBytes.length);
- dataOutputStream.write(valueBytes);
- }
- }
-
- return byteArrayOutputStream.toByteArray();
- }
-
- /**
- * Even though the wire format of V3 uses UTF-8 for string/bytes and
big-endian for numeric values,
- * the in-memory representation is STRING based for processing the metadata
before serialization
- * (by the server as it adds the statistics in metadata) and after
deserialization (by the broker as it receives
- * DataTable from each server and aggregates the values).
- * This is to make V3 implementation keep the consumers of Map<String,
String> getMetadata() API in the code happy
- * by internally converting it.
- *
- * This method use relative operations on the ByteBuffer and expects the
buffer's position to be set correctly.
- */
- private Map<String, String> deserializeMetadata(ByteBuffer buffer)
- throws IOException {
- int numEntries = buffer.getInt();
- Map<String, String> metadata = new HashMap<>();
- for (int i = 0; i < numEntries; i++) {
- int keyId = buffer.getInt();
- MetadataKey key = MetadataKey.getById(keyId);
- // Ignore unknown keys.
- if (key == null) {
- continue;
- }
- if (key.getValueType() == MetadataValueType.INT) {
- String value = "" + buffer.getInt();
- metadata.put(key.getName(), value);
- } else if (key.getValueType() == MetadataValueType.LONG) {
- String value = "" + buffer.getLong();
- metadata.put(key.getName(), value);
- } else {
- String value = DataTableUtils.decodeString(buffer);
- metadata.put(key.getName(), value);
- }
- }
- return metadata;
- }
-
- private byte[] serializeExceptions()
- throws IOException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
-
- dataOutputStream.writeInt(_errCodeToExceptionMap.size());
-
- for (Map.Entry<Integer, String> entry : _errCodeToExceptionMap.entrySet())
{
- int key = entry.getKey();
- String value = entry.getValue();
- byte[] valueBytes = value.getBytes(UTF_8);
- dataOutputStream.writeInt(key);
- dataOutputStream.writeInt(valueBytes.length);
- dataOutputStream.write(valueBytes);
- }
-
- return byteArrayOutputStream.toByteArray();
- }
-
- private Map<Integer, String> deserializeExceptions(ByteBuffer buffer)
- throws IOException {
- int numExceptions = buffer.getInt();
- Map<Integer, String> exceptions = new
HashMap<>(HashUtil.getHashMapCapacity(numExceptions));
- for (int i = 0; i < numExceptions; i++) {
- int errCode = buffer.getInt();
- String errMessage = DataTableUtils.decodeString(buffer);
- exceptions.put(errCode, errMessage);
- }
- return exceptions;
- }
-}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
index b58669fd4b..a39a7d7e9f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
@@ -36,7 +36,6 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
-import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
@@ -46,9 +45,39 @@ import static java.nio.charset.StandardCharsets.UTF_8;
/**
- * Datatable V4 Implementation is a wrapper around the Row-based data block.
+ * Datatable V4 implementation.
+ *
+ * The layout of serialized V4 datatable looks like:
+ * +-----------------------------------------------+
+ * | 13 integers of header: |
+ * | VERSION |
+ * | NUM_ROWS |
+ * | NUM_COLUMNS |
+ * | EXCEPTIONS SECTION START OFFSET |
+ * | EXCEPTIONS SECTION LENGTH |
+ * | DICTIONARY_MAP SECTION START OFFSET |
+ * | DICTIONARY_MAP SECTION LENGTH |
+ * | DATA_SCHEMA SECTION START OFFSET |
+ * | DATA_SCHEMA SECTION LENGTH |
+ * | FIXED_SIZE_DATA SECTION START OFFSET |
+ * | FIXED_SIZE_DATA SECTION LENGTH |
+ * | VARIABLE_SIZE_DATA SECTION START OFFSET |
+ * | VARIABLE_SIZE_DATA SECTION LENGTH |
+ * +-----------------------------------------------+
+ * | EXCEPTIONS SECTION |
+ * +-----------------------------------------------+
+ * | DICTIONARY_MAP SECTION |
+ * +-----------------------------------------------+
+ * | DATA_SCHEMA SECTION |
+ * +-----------------------------------------------+
+ * | FIXED_SIZE_DATA SECTION |
+ * +-----------------------------------------------+
+ * | VARIABLE_SIZE_DATA SECTION |
+ * +-----------------------------------------------+
+ * | METADATA LENGTH |
+ * | METADATA SECTION |
+ * +-----------------------------------------------+
*/
[email protected]
public class DataTableImplV4 implements DataTable {
protected static final int HEADER_SIZE = Integer.BYTES * 13;
@@ -84,8 +113,8 @@ public class DataTableImplV4 implements DataTable {
_errCodeToExceptionMap = new HashMap<>();
}
- public DataTableImplV4(int numRows, DataSchema dataSchema, String[]
stringDictionary,
- byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
+ public DataTableImplV4(int numRows, DataSchema dataSchema, String[]
stringDictionary, byte[] fixedSizeDataBytes,
+ byte[] variableSizeDataBytes) {
_numRows = numRows;
_dataSchema = dataSchema;
_numColumns = dataSchema == null ? 0 : dataSchema.size();
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
index a1ddd8d104..dc3b40bfd8 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
@@ -39,6 +39,7 @@ public class DataTableUtils {
* @return row size in bytes.
*/
public static int computeColumnOffsets(DataSchema dataSchema, int[]
columnOffsets, int dataTableVersion) {
+ assert dataTableVersion == DataTableFactory.VERSION_4;
int numColumns = columnOffsets.length;
assert numColumns == dataSchema.size();
@@ -48,26 +49,13 @@ public class DataTableUtils {
columnOffsets[i] = rowSizeInBytes;
switch (storedColumnDataTypes[i]) {
case INT:
- rowSizeInBytes += 4;
- break;
- case LONG:
- rowSizeInBytes += 8;
- break;
case FLOAT:
- if (dataTableVersion >= DataTableFactory.VERSION_4) {
- rowSizeInBytes += 4;
- } else {
- rowSizeInBytes += 8;
- }
- break;
- case DOUBLE:
- rowSizeInBytes += 8;
- break;
case STRING:
+ // For STRING, we store the dictionary id.
rowSizeInBytes += 4;
break;
- // Object and array. (POSITION|LENGTH)
default:
+ // This covers LONG, DOUBLE and variable length data types
(POSITION|LENGTH).
rowSizeInBytes += 8;
break;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
index d4e93e7476..40c438d388 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
@@ -18,64 +18,35 @@
*/
package org.apache.pinot.core.common.datatable;
+import com.google.common.base.Preconditions;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTableFactory;
-import org.apache.pinot.common.datatable.DataTableImplV2;
-import org.apache.pinot.common.datatable.DataTableImplV3;
import org.apache.pinot.common.datatable.DataTableImplV4;
import org.apache.pinot.common.utils.DataSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class DataTableBuilderFactory {
private DataTableBuilderFactory() {
}
- private static final Logger LOGGER =
LoggerFactory.getLogger(DataTableBuilderFactory.class);
-
public static final int DEFAULT_VERSION = DataTableFactory.VERSION_4;
- private static int _version = DEFAULT_VERSION;
-
public static int getDataTableVersion() {
- return _version;
+ return DEFAULT_VERSION;
}
public static void setDataTableVersion(int version) {
- LOGGER.info("Setting DataTable version to: {}", version);
- if (version != DataTableFactory.VERSION_2 && version !=
DataTableFactory.VERSION_3
- && version != DataTableFactory.VERSION_4) {
- throw new IllegalArgumentException("Unsupported version: " + version);
- }
- _version = version;
+ Preconditions.checkArgument(version == DEFAULT_VERSION, "Unsupported
version: " + version);
}
public static DataTableBuilder getDataTableBuilder(DataSchema dataSchema) {
- switch (_version) {
- case DataTableFactory.VERSION_2:
- case DataTableFactory.VERSION_3:
- return new DataTableBuilderV2V3(dataSchema, _version);
- case DataTableFactory.VERSION_4:
- return new DataTableBuilderV4(dataSchema);
- default:
- throw new IllegalStateException("Unsupported data table version: " +
_version);
- }
+ return new DataTableBuilderV4(dataSchema);
}
/**
* Returns an empty data table without data.
*/
public static DataTable getEmptyDataTable() {
- switch (_version) {
- case DataTableFactory.VERSION_2:
- return new DataTableImplV2();
- case DataTableFactory.VERSION_3:
- return new DataTableImplV3();
- case DataTableFactory.VERSION_4:
- return new DataTableImplV4();
- default:
- throw new IllegalStateException("Unsupported data table version: " +
_version);
- }
+ return new DataTableImplV4();
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
deleted file mode 100644
index 375b1b4ad1..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.common.datatable;
-
-import com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.common.datatable.DataTableFactory;
-import org.apache.pinot.common.datatable.DataTableImplV2;
-import org.apache.pinot.common.datatable.DataTableImplV3;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.spi.utils.ByteArray;
-import org.roaringbitmap.RoaringBitmap;
-
-
-/**
- * Kept for backward compatible. Things improved in the newer versions:
- * - Float size (should be 4 instead of 8)
- * - Store bytes as variable size data instead of String
- * - Use one dictionary for all columns (save space)
- * - Support setting nullRowIds
- */
-public class DataTableBuilderV2V3 extends BaseDataTableBuilder {
- private final Map<String, Map<String, Integer>> _dictionaryMap = new
HashMap<>();
- private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new
HashMap<>();
-
- public DataTableBuilderV2V3(DataSchema dataSchema, int version) {
- super(dataSchema, version);
- Preconditions.checkArgument(version <= DataTableFactory.VERSION_3);
- }
-
- @Override
- public void setColumn(int colId, String value) {
- String columnName = _dataSchema.getColumnName(colId);
- Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
- if (dictionary == null) {
- dictionary = new HashMap<>();
- _dictionaryMap.put(columnName, dictionary);
- _reverseDictionaryMap.put(columnName, new HashMap<>());
- }
-
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
- Integer dictId = dictionary.get(value);
- if (dictId == null) {
- dictId = dictionary.size();
- dictionary.put(value, dictId);
- _reverseDictionaryMap.get(columnName).put(dictId, value);
- }
- _currentRowDataByteBuffer.putInt(dictId);
- }
-
- @Override
- public void setColumn(int colId, ByteArray value)
- throws IOException {
- setColumn(colId, value.toHexString());
- }
-
- @Override
- public void setColumn(int colId, String[] values)
- throws IOException {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
- _currentRowDataByteBuffer.putInt(values.length);
-
- String columnName = _dataSchema.getColumnName(colId);
- Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
- if (dictionary == null) {
- dictionary = new HashMap<>();
- _dictionaryMap.put(columnName, dictionary);
- _reverseDictionaryMap.put(columnName, new HashMap<>());
- }
-
- for (String value : values) {
- Integer dictId = dictionary.get(value);
- if (dictId == null) {
- dictId = dictionary.size();
- dictionary.put(value, dictId);
- _reverseDictionaryMap.get(columnName).put(dictId, value);
- }
- _variableSizeDataOutputStream.writeInt(dictId);
- }
- }
-
- @Override
- public void setNullRowIds(@Nullable RoaringBitmap nullRowIds)
- throws IOException {
- throw new UnsupportedOperationException("Not supported before DataTable
V4");
- }
-
- @Override
- public DataTable build() {
- byte[] fixedSizeDataBytes =
_fixedSizeDataByteArrayOutputStream.toByteArray();
- byte[] variableSizeDataBytes =
_variableSizeDataByteArrayOutputStream.toByteArray();
- if (_version == DataTableFactory.VERSION_2) {
- return new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap,
fixedSizeDataBytes,
- variableSizeDataBytes);
- } else {
- return new DataTableImplV3(_numRows, _dataSchema, _reverseDictionaryMap,
fixedSizeDataBytes,
- variableSizeDataBytes);
- }
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index d4ce7857a5..87586fe154 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -18,16 +18,13 @@
*/
package org.apache.pinot.core.query.request;
-import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.TimerContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
@@ -111,12 +108,7 @@ public class ServerQueryRequest {
}
private static QueryContext getQueryContext(PinotQuery pinotQuery) {
- QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(pinotQuery);
- if (queryContext.isNullHandlingEnabled()) {
- Preconditions.checkState(DataTableBuilderFactory.getDataTableVersion()
>= DataTableFactory.VERSION_4,
- "Null handling cannot be enabled for data table version smaller than
4");
- }
- return queryContext;
+ return QueryContextConverterUtils.getQueryContext(pinotQuery);
}
public long getRequestId() {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 91f1df4fa7..a539f8d542 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -18,16 +18,9 @@
*/
package org.apache.pinot.core.common.datatable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.math.BigDecimal;
-import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Map;
import java.util.Random;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
@@ -46,8 +39,6 @@ import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
/**
* Unit test for {@link DataTable} serialization/de-serialization.
@@ -77,30 +68,6 @@ public class DataTableSerDeTest {
private static final int[][] BOOLEAN_ARRAYS = new int[NUM_ROWS][];
private static final long[][] TIMESTAMP_ARRAYS = new long[NUM_ROWS][];
private static final String[][] STRING_ARRAYS = new String[NUM_ROWS][];
- private static final Map<String, String> EXPECTED_METADATA =
- ImmutableMap.<String,
String>builder().put(MetadataKey.NUM_DOCS_SCANNED.getName(),
String.valueOf(20L))
- .put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
String.valueOf(5L))
- .put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
String.valueOf(7L))
- .put(MetadataKey.NUM_SEGMENTS_QUERIED.getName(), String.valueOf(6))
- .put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), String.valueOf(6))
- .put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), String.valueOf(1))
- .put(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
String.valueOf(1))
- .put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
String.valueOf(100L))
- .put(MetadataKey.TOTAL_DOCS.getName(), String.valueOf(200L))
- .put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true")
- .put(MetadataKey.TIME_USED_MS.getName(),
String.valueOf(20000L)).put(MetadataKey.TRACE_INFO.getName(),
- "StudentException: Error finding students\n"
- + " at
StudentManager.findStudents(StudentManager.java:13)\n"
- + " at StudentProgram.main(StudentProgram.java:9)\n"
- + "Caused by: DAOException: Error querying students from
database\n"
- + " at StudentDAO.list(StudentDAO.java:11)\n"
- + " at
StudentManager.findStudents(StudentManager.java:11)\n" + " ... 1 more\n"
- + "Caused by: java.sql.SQLException: Syntax Error\n"
- + " at
DatabaseUtils.executeQuery(DatabaseUtils.java:5)\n"
- + " at StudentDAO.list(StudentDAO.java:8)\n" + "
... 2 more")
- .put(MetadataKey.REQUEST_ID.getName(), String.valueOf(90181881818L))
- .put(MetadataKey.NUM_RESIZES.getName(), String.valueOf(900L))
- .put(MetadataKey.RESIZE_TIME_MS.getName(),
String.valueOf(1919199L)).build();
@Test(dataProvider = "versionProvider")
public void testException(int dataTableVersion)
@@ -119,7 +86,6 @@ public class DataTableSerDeTest {
String actual =
newDataTable.getExceptions().get(QueryException.QUERY_EXECUTION_ERROR.getErrorCode());
Assert.assertEquals(actual, expected);
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
}
@Test(dataProvider = "versionProvider")
@@ -146,15 +112,13 @@ public class DataTableSerDeTest {
new DataSchema(new String[]{"BYTES"}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BYTES}),
numRows, new Object[]{emptyBytes});
- testEmptyValues(
- new DataSchema(new String[]{"BOOL_ARR"},
- new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}),
- numRows, new Object[]{new int[]{}});
+ testEmptyValues(new DataSchema(new String[]{"BOOL_ARR"},
+ new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}), numRows,
+ new Object[]{new int[]{}});
- testEmptyValues(
- new DataSchema(new String[]{"BOOL_ARR"},
- new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}),
- numRows, new Object[]{new int[]{0}});
+ testEmptyValues(new DataSchema(new String[]{"BOOL_ARR"},
+ new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}), numRows,
+ new Object[]{new int[]{0}});
testEmptyValues(
new DataSchema(new String[]{"INT_ARR"}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT_ARRAY}),
@@ -186,7 +150,6 @@ public class DataTableSerDeTest {
new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE_ARRAY}), numRows,
new Object[]{new double[]{0}});
}
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
}
private void testEmptyValues(DataSchema dataSchema, int numRows, Object[]
emptyValues)
@@ -278,7 +241,6 @@ public class DataTableSerDeTest {
Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
}
@Test(dataProvider = "versionProvider")
@@ -301,7 +263,6 @@ public class DataTableSerDeTest {
verifyDataIsSame(newDataTable, columnDataType, 1, numRows);
}
}
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
}
@Test(dataProvider = "versionProvider")
@@ -338,346 +299,6 @@ public class DataTableSerDeTest {
Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName()));
Assert.assertTrue(
Integer.parseInt(newDataTable.getMetadata().get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()))
> 0);
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
- }
-
- @Test
- public void testV3V4Compatibility()
- throws IOException {
- DataSchema.ColumnDataType[] columnDataTypes =
DataSchema.ColumnDataType.values();
- int numColumns = columnDataTypes.length;
- String[] columnNames = new String[numColumns];
- for (int i = 0; i < numColumns; i++) {
- columnNames[i] = columnDataTypes[i].name();
- }
-
- DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-
- // TODO: verify data table compatibility across multi-stage and normal
query engine.
- // TODO: see https://github.com/apache/pinot/pull/8874/files#r894806085
-
- // Verify V4 broker can deserialize data table (has data, but has no
metadata) send by V3 server
- ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
- DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
- DataTableBuilder dataTableBuilderV3WithDataOnly =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
- fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly,
columnDataTypes, numColumns);
-
- DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create
a V3 data table
- DataTable newDataTable =
- DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker
deserialize data table bytes as V3
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- Assert.assertEquals(newDataTable.getMetadata().size(), 0);
-
- // Verify V4 broker can deserialize data table (has data and metadata)
send by V3 server
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); //
Broker deserialize data table bytes as V3
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
- // Verify V4 broker can deserialize data table (only has metadata) send by
V3 server
- DataTableBuilder dataTableBuilderV3WithMetadataDataOnly =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
- dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a
V3 data table
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); //
Broker deserialize data table bytes as V3
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
- // Verify V4 broker can deserialize (has data, but has no metadata) send
by V4 server(with ThreadCpuTimeMeasurement
- // disabled)
- ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
- DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
- DataTableBuilder dataTableBuilderV4WithDataOnly =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
- fillDataTableWithRandomData(dataTableBuilderV4WithDataOnly,
columnDataTypes, numColumns);
- DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create
a V4 data table
- // Deserialize data table bytes as V4
- newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- Assert.assertEquals(newDataTable.getMetadata().size(), 0);
-
- // Verify V4 broker can deserialize data table (has data and metadata)
send by V4 server(with
- // ThreadCpuTimeMeasurement disabled)
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- // Deserialize data table bytes as V4
- newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); //
Broker deserialize data table bytes as V4
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
- // Verify V4 broker can deserialize data table (only has metadata) send by
V4 server(with
- // ThreadCpuTimeMeasurement disabled)
- DataTableBuilder dataTableBuilderV4WithMetadataDataOnly =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
- dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a
V4 data table
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); //
Broker deserialize data table bytes as V4
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
- // Verify V4 broker can deserialize (has data, but has no metadata) send
by V4 server(with ThreadCpuTimeMeasurement
- // enabled)
- ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
- DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
- dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data
table
- // Deserialize data table bytes as V4
- newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- Assert.assertEquals(newDataTable.getMetadata().size(), 1);
-
Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
-
- // Verify V4 broker can deserialize data table (has data and metadata)
send by V4 server(with
- // ThreadCpuTimeMeasurement enabled)
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- // Deserialize data table bytes as V4
- newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); //
Broker deserialize data table bytes as V4
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
- Assert.assertEquals(newDataTable.getMetadata().size(),
EXPECTED_METADATA.keySet().size() + 1);
-
newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
- }
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
- // Verify V4 broker can deserialize data table (only has metadata) send by
V4 server(with
- // ThreadCpuTimeMeasurement enabled)
- dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a
V4 data table
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); //
Broker deserialize data table bytes as V4
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
- if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
- Assert.assertEquals(newDataTable.getMetadata().size(),
EXPECTED_METADATA.keySet().size() + 1);
-
newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
- }
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
- }
-
- @Test
- public void testV2V3Compatibility()
- throws IOException {
- DataSchema.ColumnDataType[] columnDataTypes =
DataSchema.ColumnDataType.values();
- int numColumns = columnDataTypes.length;
- String[] columnNames = new String[numColumns];
- for (int i = 0; i < numColumns; i++) {
- columnNames[i] = columnDataTypes[i].name();
- }
-
- DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-
- // Verify V3 broker can deserialize data table (has data, but has no
metadata) send by V2 server
- ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
- DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_2);
- DataTableBuilder dataTableBuilderV2WithDataOnly =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
- fillDataTableWithRandomData(dataTableBuilderV2WithDataOnly,
columnDataTypes, numColumns);
-
- DataTable dataTableV2 = dataTableBuilderV2WithDataOnly.build(); // create
a V2 data table
- DataTable newDataTable =
- DataTableFactory.getDataTable(dataTableV2.toBytes()); // Broker
deserialize data table bytes as V2
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- Assert.assertEquals(newDataTable.getMetadata().size(), 0);
-
- // Verify V3 broker can deserialize data table (has data and metadata)
send by V2 server
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- newDataTable = DataTableFactory.getDataTable(dataTableV2.toBytes()); //
Broker deserialize data table bytes as V2
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
- // Verify V3 broker can deserialize data table (only has metadata) send by
V2 server
- DataTableBuilder dataTableBuilderV2WithMetadataDataOnly =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
- dataTableV2 = dataTableBuilderV2WithMetadataDataOnly.build(); // create a
V2 data table
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- newDataTable = DataTableFactory.getDataTable(dataTableV2.toBytes()); //
Broker deserialize data table bytes as V2
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
- // Verify V3 broker can deserialize (has data, but has no metadata) send
by V3 server(with ThreadCpuTimeMeasurement
- // disabled)
- ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
- DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
- DataTableBuilder dataTableBuilderV3WithDataOnly =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
- fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly,
columnDataTypes, numColumns);
- DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create
a V3 data table
- // Deserialize data table bytes as V3
- newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes());
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- Assert.assertEquals(newDataTable.getMetadata().size(), 0);
-
- // Verify V3 broker can deserialize data table (has data and metadata)
send by V3 server(with
- // ThreadCpuTimeMeasurement disabled)
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- // Deserialize data table bytes as V3
- newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); //
Broker deserialize data table bytes as V3
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
- // Verify V3 broker can deserialize data table (only has metadata) send by
V3 server(with
- // ThreadCpuTimeMeasurement disabled)
- DataTableBuilder dataTableBuilderV3WithMetadataDataOnly =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
- dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a
V3 data table
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); //
Broker deserialize data table bytes as V3
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
- // Verify V3 broker can deserialize (has data, but has no metadata) send
by V3 server(with ThreadCpuTimeMeasurement
- // enabled)
- ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
- DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
- dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data
table
- // Deserialize data table bytes as V3
- newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes());
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- Assert.assertEquals(newDataTable.getMetadata().size(), 1);
-
Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
-
- // Verify V3 broker can deserialize data table (has data and metadata)
send by V3 server(with
- // ThreadCpuTimeMeasurement enabled)
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- // Deserialize data table bytes as V3
- newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); //
Broker deserialize data table bytes as V3
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
- verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
- Assert.assertEquals(newDataTable.getMetadata().size(),
EXPECTED_METADATA.keySet().size() + 1);
-
newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
- }
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
- // Verify V3 broker can deserialize data table (only has metadata) send by
V3 server(with
- // ThreadCpuTimeMeasurement enabled)
- dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a
V3 data table
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
- newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); //
Broker deserialize data table bytes as V3
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
- if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
- Assert.assertEquals(newDataTable.getMetadata().size(),
EXPECTED_METADATA.keySet().size() + 1);
-
newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
- }
- Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
- }
-
- @Test
- public void testDataTableVer3MetadataBytesLayout()
- throws IOException {
- DataSchema.ColumnDataType[] columnDataTypes =
DataSchema.ColumnDataType.values();
- int numColumns = columnDataTypes.length;
- String[] columnNames = new String[numColumns];
- for (int i = 0; i < numColumns; i++) {
- columnNames[i] = columnDataTypes[i].name();
- }
-
- ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
- DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
- DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
- DataTableBuilder dataTableBuilder =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
- fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
-
- DataTable dataTable = dataTableBuilder.build();
-
- for (String key : EXPECTED_METADATA.keySet()) {
- dataTable.getMetadata().put(key, EXPECTED_METADATA.get(key));
- }
-
- ByteBuffer byteBuffer = ByteBuffer.wrap(dataTable.toBytes());
- int version = byteBuffer.getInt();
- Assert.assertEquals(version, DataTableFactory.VERSION_3);
- byteBuffer.getInt(); // numOfRows
- byteBuffer.getInt(); // numOfColumns
- byteBuffer.getInt(); // exceptionsStart
- byteBuffer.getInt(); // exceptionsLength
- byteBuffer.getInt(); // dictionaryMapStart
- byteBuffer.getInt(); // dictionaryMapLength
- byteBuffer.getInt(); // dataSchemaStart
- byteBuffer.getInt(); // dataSchemaLength
- byteBuffer.getInt(); // fixedSizeDataStart
- byteBuffer.getInt(); // fixedSizeDataLength
- int variableSizeDataStart = byteBuffer.getInt();
- int variableSizeDataLength = byteBuffer.getInt();
-
- int metadataStart = variableSizeDataStart + variableSizeDataLength;
- byteBuffer.position(metadataStart);
- int metadataLength = byteBuffer.getInt();
- byte[] metadataBytes = new byte[metadataLength];
- byteBuffer.get(metadataBytes);
-
- try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(metadataBytes);
- DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream)) {
- int numEntries = dataInputStream.readInt();
- // DataTable V3 and V4 serialization logic will add an extra
RESPONSE_SER_CPU_TIME_NS KV pair into metadata
- Assert.assertEquals(numEntries, EXPECTED_METADATA.size());
- for (int i = 0; i < numEntries; i++) {
- int keyOrdinal = dataInputStream.readInt();
- DataTable.MetadataKey key = MetadataKey.getById(keyOrdinal);
- Assert.assertNotEquals(key, null);
- if (key.getValueType() == DataTable.MetadataValueType.INT) {
- byte[] actualBytes = new byte[Integer.BYTES];
- dataInputStream.read(actualBytes);
- Assert.assertEquals(actualBytes,
Ints.toByteArray(Integer.parseInt(EXPECTED_METADATA.get(key.getName()))));
- } else if (key.getValueType() == DataTable.MetadataValueType.LONG) {
- byte[] actualBytes = new byte[Long.BYTES];
- dataInputStream.read(actualBytes);
- // Ignore the
THREAD_CPU_TIME_NS/SYSTEM_ACTIVITIES_CPU_TIME_NS/RESPONSE_SER_CPU_TIME_NS key
since their value
- // are evaluated during query execution.
- if (key != MetadataKey.THREAD_CPU_TIME_NS && key !=
MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS
- && key != MetadataKey.RESPONSE_SER_CPU_TIME_NS) {
- Assert.assertEquals(actualBytes,
Longs.toByteArray(Long.parseLong(EXPECTED_METADATA.get(key.getName()))));
- }
- } else {
- int valueLength = dataInputStream.readInt();
- byte[] actualBytes = new byte[valueLength];
- dataInputStream.read(actualBytes);
- Assert.assertEquals(actualBytes,
EXPECTED_METADATA.get(key.getName()).getBytes(UTF_8));
- }
- }
- }
}
private void fillDataTableWithRandomData(DataTableBuilder dataTableBuilder,
@@ -689,18 +310,15 @@ public class DataTableSerDeTest {
private void fillDataTableWithRandomData(DataTableBuilder dataTableBuilder,
DataSchema.ColumnDataType[] columnDataTypes, int numColumns, int numRows)
throws IOException {
- RoaringBitmap[] nullBitmaps = null;
- if (DataTableBuilderFactory.getDataTableVersion() >=
DataTableFactory.VERSION_4) {
- nullBitmaps = new RoaringBitmap[numColumns];
- for (int colId = 0; colId < numColumns; colId++) {
- nullBitmaps[colId] = new RoaringBitmap();
- }
+ RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+ for (int colId = 0; colId < numColumns; colId++) {
+ nullBitmaps[colId] = new RoaringBitmap();
}
for (int rowId = 0; rowId < numRows; rowId++) {
dataTableBuilder.startRow();
for (int colId = 0; colId < numColumns; colId++) {
// Note: isNull is handled for SV columns only for now.
- boolean isNull = nullBitmaps != null && RANDOM.nextFloat() < 0.1;
+ boolean isNull = RANDOM.nextFloat() < 0.1;
if (isNull) {
nullBitmaps[colId].add(rowId);
}
@@ -931,8 +549,7 @@ public class DataTableSerDeTest {
@DataProvider(name = "versionProvider")
public Object[][] provideVersion() {
return new Object[][]{
- new Object[]{DataTableFactory.VERSION_4},
- new Object[]{DataTableFactory.VERSION_3},
+ new Object[]{DataTableFactory.VERSION_4}
};
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java
index a2a720edfc..234ccfc0bf 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java
@@ -30,7 +30,6 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -658,7 +657,6 @@ public class AllNullQueriesTest extends BaseQueriesTest {
query.verify(columnDataType, brokerResponse);
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
_indexSegment.destroy();
FileUtils.deleteDirectory(indexDir);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
index 3d50c89484..572a6d5c0e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
@@ -32,7 +32,6 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -428,7 +427,6 @@ public class BigDecimalQueriesTest extends BaseQueriesTest {
i++;
}
}
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
}
@AfterClass
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java
index 481634f116..bf9fe6e98f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java
@@ -31,7 +31,6 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -447,7 +446,6 @@ public class BooleanNullEnabledQueriesTest extends
BaseQueriesTest {
assertEquals(thirdRow[0], (long) _nullValuesCount * 4);
assertNull(thirdRow[1]);
}
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
}
@AfterClass
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java
index 41a664c5f1..bba3aa4b53 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java
@@ -32,7 +32,6 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -733,7 +732,6 @@ public class NullEnabledQueriesTest extends BaseQueriesTest
{
assertNull(rows.get(rows.size() - 1)[0]);
}
}
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
}
@AfterClass
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
index 7a838052f9..cdf0af03bd 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -23,7 +23,6 @@ import java.io.File;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -206,7 +205,6 @@ public class NullHandlingIntegrationTest extends
BaseClusterIntegrationTestSet {
pinotQuery = "SELECT COUNT(1) FROM " + getTableName() + "
option(enableNullHandling=true)";
h2Query = "SELECT COUNT(1) FROM " + getTableName();
testQuery(pinotQuery, h2Query);
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
}
@Test(dataProvider = "nullLiteralQueries")
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 6fb23d96f6..48c703b653 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -50,7 +50,6 @@ import org.apache.hc.core5.http.message.BasicNameValuePair;
import org.apache.helix.model.IdealState;
import org.apache.pinot.client.PinotConnection;
import org.apache.pinot.client.PinotDriver;
-import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -59,7 +58,6 @@ import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.http.HttpClient;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
@@ -190,7 +188,6 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
@BeforeClass
public void setUp()
throws Exception {
- DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// Start the Pinot cluster
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
index 67e22a66d0..65f6906b68 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.response.broker.ResultTable;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.mailbox.MailboxService;
@@ -143,7 +142,6 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
@AfterClass
public void tearDown() {
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
for (QueryServerEnclosure server : _servers.values()) {
server.shutDown();
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 13e35af7ec..9ffce6810d 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -41,7 +41,6 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.mailbox.MailboxService;
@@ -249,7 +248,6 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
public void tearDown() {
// Restore the original default timezone
TimeZone.setDefault(_currentSystemTimeZone);
-
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
for (QueryServerEnclosure server : _servers.values()) {
server.shutDown();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]