mqliang commented on a change in pull request #6710: URL: https://github.com/apache/incubator-pinot/pull/6710#discussion_r606099782
########## File path: pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java ########## @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.common.datatable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.spi.utils.ByteArray; +import org.apache.pinot.spi.utils.BytesUtils; + +import static org.apache.pinot.core.common.datatable.DataTableUtils.decodeString; + + +/** + * Base implementation of the DataTable interface. + */ +public abstract class BaseDataTable implements DataTable { + protected int _numRows; + protected int _numColumns; + protected DataSchema _dataSchema; + protected int[] _columnOffsets; + protected int _rowSizeInBytes; + protected Map<String, Map<Integer, String>> _dictionaryMap; + protected byte[] _fixedSizeDataBytes; + protected ByteBuffer _fixedSizeData; + protected byte[] _variableSizeDataBytes; + protected ByteBuffer _variableSizeData; + protected Map<String, String> _metadata; + + public BaseDataTable(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap, + byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) { + _numRows = numRows; + _numColumns = dataSchema.size(); + _dataSchema = dataSchema; + _columnOffsets = new int[_numColumns]; + _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets); + _dictionaryMap = dictionaryMap; + _fixedSizeDataBytes = fixedSizeDataBytes; + _fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes); + _variableSizeDataBytes = variableSizeDataBytes; + _variableSizeData = ByteBuffer.wrap(variableSizeDataBytes); + _metadata = new HashMap<>(); + } + + /** + * Construct empty data table. (Server side) + */ + public BaseDataTable() { + _numRows = 0; + _numColumns = 0; + _dataSchema = null; + _columnOffsets = null; + _rowSizeInBytes = 0; + _dictionaryMap = null; + _fixedSizeDataBytes = null; + _fixedSizeData = null; + _variableSizeDataBytes = null; + _variableSizeData = null; + _metadata = new HashMap<>(); + } + + /** + * Helper method to serialize dictionary map. + */ + protected byte[] serializeDictionaryMap(Map<String, Map<Integer, String>> dictionaryMap) Review comment: done ########## File path: pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java ########## @@ -96,22 +131,267 @@ public void testAllDataTypes() for (int i = 0; i < numColumns; i++) { columnNames[i] = columnDataTypes[i].name(); } + + int[] ints = new int[NUM_ROWS]; + long[] longs = new long[NUM_ROWS]; + float[] floats = new float[NUM_ROWS]; + double[] doubles = new double[NUM_ROWS]; + String[] strings = new String[NUM_ROWS]; + byte[][] bytes = new byte[NUM_ROWS][]; + Object[] objects = new Object[NUM_ROWS]; + int[][] intArrays = new int[NUM_ROWS][]; + long[][] longArrays = new long[NUM_ROWS][]; + float[][] floatArrays = new float[NUM_ROWS][]; + double[][] doubleArrays = new double[NUM_ROWS][]; + String[][] stringArrays = new String[NUM_ROWS][]; + DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes); + DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema); + fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns, ints, longs, floats, doubles, strings, + bytes, objects, intArrays, longArrays, floatArrays, doubleArrays, stringArrays); + + DataTable dataTable = dataTableBuilder.build(); + DataTable newDataTable = DataTableFactory.getDataTable(dataTable.toBytes()); + Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); + Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); + verifyDataIsSame(newDataTable, columnDataTypes, numColumns, ints, longs, floats, doubles, strings, bytes, objects, + intArrays, longArrays, floatArrays, doubleArrays, stringArrays); + } + + @Test + public void testV2V3Compatibility() + throws IOException { + DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values(); + int numColumns = columnDataTypes.length; + String[] columnNames = new String[numColumns]; + for (int i = 0; i < numColumns; i++) { + columnNames[i] = columnDataTypes[i].name(); + } + + int[] ints = new int[NUM_ROWS]; + long[] longs = new long[NUM_ROWS]; + float[] floats = new float[NUM_ROWS]; + double[] doubles = new double[NUM_ROWS]; + String[] strings = new String[NUM_ROWS]; + byte[][] bytes = new byte[NUM_ROWS][]; + Object[] objects = new Object[NUM_ROWS]; + int[][] intArrays = new int[NUM_ROWS][]; + long[][] longArrays = new long[NUM_ROWS][]; + float[][] floatArrays = new float[NUM_ROWS][]; + double[][] doubleArrays = new double[NUM_ROWS][]; + String[][] stringArrays = new String[NUM_ROWS][]; + + DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes); + + // Verify V3 broker can deserialize data table (has data, but has no metadata) send by V2 server + DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_2); + DataTableBuilder dataTableBuilderV2WithDataOnly = new DataTableBuilder(dataSchema); + fillDataTableWithRandomData(dataTableBuilderV2WithDataOnly, columnDataTypes, numColumns, ints, longs, floats, + doubles, strings, bytes, objects, intArrays, longArrays, floatArrays, doubleArrays, stringArrays); + + DataTable dataTableV2 = dataTableBuilderV2WithDataOnly.build(); // create a V2 data table + DataTable newDataTable = + DataTableFactory.getDataTable(dataTableV2.toBytes()); // Broker deserialize data table bytes as V2 + Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); + Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); + verifyDataIsSame(newDataTable, columnDataTypes, numColumns, ints, longs, floats, doubles, strings, bytes, objects, + intArrays, longArrays, floatArrays, doubleArrays, stringArrays); + Assert.assertEquals(newDataTable.getMetadata().size(), 0); + + // Verify V3 broker can deserialize data table (has data and metadata) send by V2 server + for (String key : EXPECTED_METADATA.keySet()) { + dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key)); + } + newDataTable = DataTableFactory.getDataTable(dataTableV2.toBytes()); // Broker deserialize data table bytes as V2 + Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); + Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); + verifyDataIsSame(newDataTable, columnDataTypes, numColumns, ints, longs, floats, doubles, strings, bytes, objects, + intArrays, longArrays, floatArrays, doubleArrays, stringArrays); + Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); + + // Verify V3 broker can deserialize data table (only has metadata) send by V2 server + DataTableBuilder dataTableBuilderV2WithMetadataDataOnly = new DataTableBuilder(dataSchema); + dataTableV2 = dataTableBuilderV2WithMetadataDataOnly.build(); // create a V2 data table + for (String key : EXPECTED_METADATA.keySet()) { + dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key)); + } + newDataTable = DataTableFactory.getDataTable(dataTableV2.toBytes()); // Broker deserialize data table bytes as V2 + Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); + Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0); + Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); + + // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server. + DataTableBuilder.setCurrentDataTableVersion(VERSION_3); + DataTableBuilder dataTableBuilderV3WithDataOnly = new DataTableBuilder(dataSchema); + fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, columnDataTypes, numColumns, ints, longs, floats, + doubles, strings, bytes, objects, intArrays, longArrays, floatArrays, doubleArrays, stringArrays); + DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table + // Deserialize data table bytes as V3 + newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); + Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); + Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); + verifyDataIsSame(newDataTable, columnDataTypes, numColumns, ints, longs, floats, doubles, strings, bytes, objects, + intArrays, longArrays, floatArrays, doubleArrays, stringArrays); + // DataTable V3 serialization logic will add an extra THREAD_CPU_TIME_NS KV pair into metadata + Assert.assertEquals(newDataTable.getMetadata().size(), 1); + Assert.assertTrue(newDataTable.getMetadata().containsKey(THREAD_CPU_TIME_NS.getName())); + + // Verify V3 broker can deserialize data table (has data and metadata) send by V3 server + for (String key : EXPECTED_METADATA.keySet()) { + dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key)); + } + newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 + Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); + Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); + verifyDataIsSame(newDataTable, columnDataTypes, numColumns, ints, longs, floats, doubles, strings, bytes, objects, + intArrays, longArrays, floatArrays, doubleArrays, stringArrays); + newDataTable.getMetadata().remove(THREAD_CPU_TIME_NS.getName()); + Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); + // Verify V3 broker can deserialize data table (only has metadata) send by V3 server + DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = new DataTableBuilder(dataSchema); + dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V2 data table + for (String key : EXPECTED_METADATA.keySet()) { + dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key)); + } + newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V2 + Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); Review comment: fixed in https://github.com/apache/incubator-pinot/pull/6738 ########## File path: pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java ########## @@ -80,4 +82,76 @@ double[] getDoubleArray(int rowId, int colId); String[] getStringArray(int rowId, int colId); + + enum MetadataValueType { + INT, LONG, STRING + } + + /* The MetadataKey is used in V3, where we present metadata as Map<MetadataKey, String> + * ATTENTION: + * - Don't change existing keys. + * - Don't remove existing keys. + * - Always add new keys to the end. + * Otherwise, backward compatibility will be broken. + */ + enum MetadataKey { + UNKNOWN("unknown", MetadataValueType.STRING), + TABLE("table", MetadataValueType.STRING), // NOTE: this key is only used in PrioritySchedulerTest + NUM_DOCS_SCANNED("numDocsScanned", MetadataValueType.LONG), + NUM_ENTRIES_SCANNED_IN_FILTER("numEntriesScannedInFilter", MetadataValueType.LONG), + NUM_ENTRIES_SCANNED_POST_FILTER("numEntriesScannedPostFilter", MetadataValueType.LONG), + NUM_SEGMENTS_QUERIED("numSegmentsQueried", MetadataValueType.INT), + NUM_SEGMENTS_PROCESSED("numSegmentsProcessed", MetadataValueType.INT), + NUM_SEGMENTS_MATCHED("numSegmentsMatched", MetadataValueType.INT), + NUM_CONSUMING_SEGMENTS_PROCESSED("numConsumingSegmentsProcessed", MetadataValueType.INT), + MIN_CONSUMING_FRESHNESS_TIME_MS("minConsumingFreshnessTimeMs", MetadataValueType.LONG), + TOTAL_DOCS("totalDocs", MetadataValueType.LONG), + NUM_GROUPS_LIMIT_REACHED("numGroupsLimitReached", MetadataValueType.STRING), + TIME_USED_MS("timeUsedMs", MetadataValueType.LONG), + TRACE_INFO("traceInfo", MetadataValueType.STRING), + REQUEST_ID("requestId", MetadataValueType.LONG), + NUM_RESIZES("numResizes", MetadataValueType.INT), + RESIZE_TIME_MS("resizeTimeMs", MetadataValueType.LONG), + THREAD_CPU_TIME_NS("threadCpuTimeNs", MetadataValueType.LONG); + + private static final Map<String, MetadataKey> _nameToEnumKeyMap = new HashMap<>(); + private final String _name; + private final MetadataValueType _valueType; + + MetadataKey(String name, MetadataValueType valueType) { + this._name = name; + this._valueType = valueType; + } + + // getByOrdinal returns an optional enum key for a given ordinal or null if the key does not exist. + public static MetadataKey getByOrdinal(int ordinal) { + if (ordinal >= MetadataKey.values().length) { + return null; + } + return MetadataKey.values()[ordinal]; + } + + // getByName returns an enum key for a given name or null if the key does not exist. + public static MetadataKey getByName(String name) { + return _nameToEnumKeyMap.getOrDefault(name, null); + } Review comment: done ########## File path: pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java ########## @@ -80,4 +82,76 @@ double[] getDoubleArray(int rowId, int colId); String[] getStringArray(int rowId, int colId); + + enum MetadataValueType { + INT, LONG, STRING + } + + /* The MetadataKey is used in V3, where we present metadata as Map<MetadataKey, String> + * ATTENTION: + * - Don't change existing keys. + * - Don't remove existing keys. + * - Always add new keys to the end. + * Otherwise, backward compatibility will be broken. + */ + enum MetadataKey { + UNKNOWN("unknown", MetadataValueType.STRING), + TABLE("table", MetadataValueType.STRING), // NOTE: this key is only used in PrioritySchedulerTest + NUM_DOCS_SCANNED("numDocsScanned", MetadataValueType.LONG), + NUM_ENTRIES_SCANNED_IN_FILTER("numEntriesScannedInFilter", MetadataValueType.LONG), + NUM_ENTRIES_SCANNED_POST_FILTER("numEntriesScannedPostFilter", MetadataValueType.LONG), + NUM_SEGMENTS_QUERIED("numSegmentsQueried", MetadataValueType.INT), + NUM_SEGMENTS_PROCESSED("numSegmentsProcessed", MetadataValueType.INT), + NUM_SEGMENTS_MATCHED("numSegmentsMatched", MetadataValueType.INT), + NUM_CONSUMING_SEGMENTS_PROCESSED("numConsumingSegmentsProcessed", MetadataValueType.INT), + MIN_CONSUMING_FRESHNESS_TIME_MS("minConsumingFreshnessTimeMs", MetadataValueType.LONG), + TOTAL_DOCS("totalDocs", MetadataValueType.LONG), + NUM_GROUPS_LIMIT_REACHED("numGroupsLimitReached", MetadataValueType.STRING), + TIME_USED_MS("timeUsedMs", MetadataValueType.LONG), + TRACE_INFO("traceInfo", MetadataValueType.STRING), + REQUEST_ID("requestId", MetadataValueType.LONG), + NUM_RESIZES("numResizes", MetadataValueType.INT), + RESIZE_TIME_MS("resizeTimeMs", MetadataValueType.LONG), + THREAD_CPU_TIME_NS("threadCpuTimeNs", MetadataValueType.LONG); + + private static final Map<String, MetadataKey> _nameToEnumKeyMap = new HashMap<>(); + private final String _name; + private final MetadataValueType _valueType; + + MetadataKey(String name, MetadataValueType valueType) { + this._name = name; + this._valueType = valueType; + } + + // getByOrdinal returns an optional enum key for a given ordinal or null if the key does not exist. + public static MetadataKey getByOrdinal(int ordinal) { Review comment: done ########## File path: pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java ########## @@ -80,4 +82,76 @@ double[] getDoubleArray(int rowId, int colId); String[] getStringArray(int rowId, int colId); + + enum MetadataValueType { + INT, LONG, STRING + } + + /* The MetadataKey is used in V3, where we present metadata as Map<MetadataKey, String> + * ATTENTION: + * - Don't change existing keys. + * - Don't remove existing keys. + * - Always add new keys to the end. + * Otherwise, backward compatibility will be broken. + */ + enum MetadataKey { + UNKNOWN("unknown", MetadataValueType.STRING), + TABLE("table", MetadataValueType.STRING), // NOTE: this key is only used in PrioritySchedulerTest + NUM_DOCS_SCANNED("numDocsScanned", MetadataValueType.LONG), + NUM_ENTRIES_SCANNED_IN_FILTER("numEntriesScannedInFilter", MetadataValueType.LONG), + NUM_ENTRIES_SCANNED_POST_FILTER("numEntriesScannedPostFilter", MetadataValueType.LONG), + NUM_SEGMENTS_QUERIED("numSegmentsQueried", MetadataValueType.INT), + NUM_SEGMENTS_PROCESSED("numSegmentsProcessed", MetadataValueType.INT), + NUM_SEGMENTS_MATCHED("numSegmentsMatched", MetadataValueType.INT), + NUM_CONSUMING_SEGMENTS_PROCESSED("numConsumingSegmentsProcessed", MetadataValueType.INT), + MIN_CONSUMING_FRESHNESS_TIME_MS("minConsumingFreshnessTimeMs", MetadataValueType.LONG), + TOTAL_DOCS("totalDocs", MetadataValueType.LONG), + NUM_GROUPS_LIMIT_REACHED("numGroupsLimitReached", MetadataValueType.STRING), + TIME_USED_MS("timeUsedMs", MetadataValueType.LONG), + TRACE_INFO("traceInfo", MetadataValueType.STRING), + REQUEST_ID("requestId", MetadataValueType.LONG), + NUM_RESIZES("numResizes", MetadataValueType.INT), + RESIZE_TIME_MS("resizeTimeMs", MetadataValueType.LONG), + THREAD_CPU_TIME_NS("threadCpuTimeNs", MetadataValueType.LONG); + + private static final Map<String, MetadataKey> _nameToEnumKeyMap = new HashMap<>(); + private final String _name; + private final MetadataValueType _valueType; + + MetadataKey(String name, MetadataValueType valueType) { + this._name = name; + this._valueType = valueType; Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
