lnbest0707-uber commented on code in PR #12788: URL: https://github.com/apache/pinot/pull/12788#discussion_r1554805805
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java: ########## @@ -0,0 +1,715 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.recordtransformer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Preconditions; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.pinot.common.metrics.ServerGauge; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.segment.local.utils.Base64Utils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerV2Config; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.metrics.PinotMeter; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This transformer evolves from {@link SchemaConformingTransformer} and is designed to support extra cases for + * better text searching: + * - Support over-lapping schema fields, in which case it could support schema column "a" and "a.b" at the same time. + * And it only allows primitive type fields to be the value. + * - Extract flattened key-value pairs as mergedTextIndex for better text searching. + * - Add shingle index tokenization functionality for extremely large text fields. + * <p> + * For example, consider this record: + * <pre> + * { + * "a": 1, + * "b": "2", + * "c": { + * "d": 3, + * "e_noindex": 4, + * "f_noindex": { + * "g": 5 + * }, + * "x": { + * "y": 9, + * "z_noindex": 10 + * } + * } + * "h_noindex": "6", + * "i_noindex": { + * "j": 7, + * "k": 8 + * } + * } + * </pre> + * And let's say the table's schema contains these fields: + * <ul> + * <li>a</li> + * <li>c</li> + * <li>c.d</li> + * </ul> + * <p> + * The record would be transformed into the following (refer to {@link SchemaConformingTransformerV2Config} for + * * default constant values): + * <pre> + * { + * "a": 1, + * "c": null, + * "c.d": 3, + * "json_data": { + * "b": "2", + * "c": { + * "x": { + * "y": 9 + * } + * } + * } + * "json_data_no_idx": { + * "c": { + * "e_noindex": 4, + * "f_noindex": { + * "g": 5 + * }, + * "x": { + * "z_noindex": 10 + * } + * }, + * "h_noindex": "6", + * "i_noindex": { + * "j": 7, + * "k": 8 + * } + * }, + * "__mergedTextIndex": [ + * "1:a", "2:b", "3:c.d", "9:c.x.y" + * ] + * } + * </pre> + * <p> + * The "__mergedTextIndex" could filter and manipulate the data based on the configuration in + * {@link SchemaConformingTransformerV2Config}. + */ +public class SchemaConformingTransformerV2 implements RecordTransformer { + private static final Logger _logger = LoggerFactory.getLogger(SchemaConformingTransformerV2.class); + private static final int MAXIMUM_LUCENE_TOKEN_SIZE = 32766; + private static final String MIN_TOKEN_LENGTH_DESCRIPTION = + "key length + `:` + shingle index overlap length + one non-overlap char"; + + private final boolean _continueOnError; + private final SchemaConformingTransformerV2Config _transformerConfig; + private final DataType _indexableExtrasFieldType; + private final DataType _unindexableExtrasFieldType; + private final DimensionFieldSpec _mergedTextIndexFieldSpec; + @Nullable + ServerMetrics _serverMetrics = null; + private SchemaTreeNode _schemaTree; + @Nullable + private PinotMeter _realtimeMergedTextIndexTruncatedTokenSizeMeter = null; + private String _tableName; + private long _mergedTextIndexTokenBytesCount = 0L; + private long _mergedTextIndexTokenCount = 0L; + + public SchemaConformingTransformerV2(TableConfig tableConfig, Schema schema) { + if (null == tableConfig.getIngestionConfig() || null == tableConfig.getIngestionConfig() + .getSchemaConformingTransformerV2Config()) { + _continueOnError = false; + _transformerConfig = null; + _indexableExtrasFieldType = null; + _unindexableExtrasFieldType = null; + _mergedTextIndexFieldSpec = null; + return; + } + + _continueOnError = tableConfig.getIngestionConfig().isContinueOnError(); + _transformerConfig = tableConfig.getIngestionConfig().getSchemaConformingTransformerV2Config(); + String indexableExtrasFieldName = _transformerConfig.getIndexableExtrasField(); + _indexableExtrasFieldType = + indexableExtrasFieldName == null ? null : SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, + indexableExtrasFieldName); + String unindexableExtrasFieldName = _transformerConfig.getUnindexableExtrasField(); + _unindexableExtrasFieldType = + null == unindexableExtrasFieldName ? null : SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, + unindexableExtrasFieldName); + _mergedTextIndexFieldSpec = schema.getDimensionSpec(_transformerConfig.getMergedTextIndexField()); + _tableName = tableConfig.getTableName(); + _schemaTree = validateSchemaAndCreateTree(schema, _transformerConfig); + _serverMetrics = ServerMetrics.get(); + } + + /** + * Validates the schema against the given transformer's configuration. + */ + public static void validateSchema(@Nonnull Schema schema, + @Nonnull SchemaConformingTransformerV2Config transformerConfig) { + validateSchemaFieldNames(schema.getPhysicalColumnNames(), transformerConfig); + + String indexableExtrasFieldName = transformerConfig.getIndexableExtrasField(); + if (null != indexableExtrasFieldName) { + SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, indexableExtrasFieldName); + } + String unindexableExtrasFieldName = transformerConfig.getUnindexableExtrasField(); + if (null != unindexableExtrasFieldName) { + SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, indexableExtrasFieldName); + } + + validateSchemaAndCreateTree(schema, transformerConfig); + } + + /** + * Heuristic filter to detect whether a byte array is longer than a specified length and contains only base64 + * characters so that we treat it as encoded binary data. + * @param bytes array to check + * @param minLength byte array shorter than this length will not be treated as encoded binary data + * @return true if the input bytes is base64 encoded binary data by the heuristic above, false otherwise + */ + public static boolean base64ValueFilter(final byte[] bytes, int minLength) { + return bytes.length >= minLength && Base64Utils.isBase64IgnoreTrailingPeriods(bytes); + } + + /** + * Validates that none of the schema fields have names that conflict with the transformer's configuration. + */ + private static void validateSchemaFieldNames(Set<String> schemaFields, + SchemaConformingTransformerV2Config transformerConfig) { + // Validate that none of the columns in the schema end with unindexableFieldSuffix + String unindexableFieldSuffix = transformerConfig.getUnindexableFieldSuffix(); + if (null != unindexableFieldSuffix) { + for (String field : schemaFields) { + Preconditions.checkState(!field.endsWith(unindexableFieldSuffix), "Field '%s' has no-index suffix '%s'", field, + unindexableFieldSuffix); + } + } + + // Validate that none of the columns in the schema end overlap with the fields in fieldPathsToDrop + Set<String> fieldPathsToDrop = transformerConfig.getFieldPathsToDrop(); + if (null != fieldPathsToDrop) { + Set<String> fieldIntersection = new HashSet<>(schemaFields); + fieldIntersection.retainAll(fieldPathsToDrop); + Preconditions.checkState(fieldIntersection.isEmpty(), "Fields in schema overlap with fieldPathsToDrop"); + } + } + + /** + * Validates the schema with a SchemaConformingTransformerConfig instance and creates a tree representing the fields + * in the schema to be used when transforming input records. Refer to {@link SchemaTreeNode} for details. + * @throws IllegalArgumentException if schema validation fails in: + * <ul> + * <li>One of the fields in the schema has a name which when interpreted as a JSON path, corresponds to an object + * with an empty sub-key. E.g., the field name "a..b" corresponds to the JSON {"a": {"": {"b": ...}}}</li> + * </ul> + */ + private static SchemaTreeNode validateSchemaAndCreateTree(@Nonnull Schema schema, + @Nonnull SchemaConformingTransformerV2Config transformerConfig) + throws IllegalArgumentException { + Set<String> schemaFields = schema.getPhysicalColumnNames(); + Map<String, String> jsonKeyPathToColumnNameMap = new HashMap<>(); + for (Map.Entry<String, String> entry : transformerConfig.getColumnNameToJsonKeyPathMap().entrySet()) { + String columnName = entry.getKey(); + String jsonKeyPath = entry.getValue(); + schemaFields.remove(columnName); + schemaFields.add(jsonKeyPath); + jsonKeyPathToColumnNameMap.put(jsonKeyPath, columnName); + } + + SchemaTreeNode rootNode = new SchemaTreeNode("", null, schema); + List<String> subKeys = new ArrayList<>(); + for (String field : schemaFields) { + SchemaTreeNode currentNode = rootNode; + int keySeparatorIdx = field.indexOf(JsonUtils.KEY_SEPARATOR); + if (-1 == keySeparatorIdx) { + // Not a flattened key + currentNode = rootNode.getAndCreateChild(field, schema); + } else { + subKeys.clear(); + SchemaConformingTransformer.getAndValidateSubKeys(field, keySeparatorIdx, subKeys); + for (String subKey : subKeys) { + SchemaTreeNode childNode = currentNode.getAndCreateChild(subKey, schema); + currentNode = childNode; + } + } + currentNode.setColumn(jsonKeyPathToColumnNameMap.get(field)); + } + + return rootNode; + } + + @Override + public boolean isNoOp() { + return null == _transformerConfig; + } + + @Nullable + @Override + public GenericRow transform(GenericRow record) { + GenericRow outputRecord = new GenericRow(); + Map<String, Object> mergedTextIndexMap = new HashMap<>(); + + try { + Deque<String> jsonPath = new ArrayDeque<>(); + ExtraFieldsContainer extraFieldsContainer = + new ExtraFieldsContainer(null != _transformerConfig.getUnindexableExtrasField()); + for (Map.Entry<String, Object> recordEntry : record.getFieldToValueMap().entrySet()) { + String recordKey = recordEntry.getKey(); + Object recordValue = recordEntry.getValue(); + jsonPath.addLast(recordKey); + ExtraFieldsContainer currentFieldsContainer = + processField(_schemaTree, jsonPath, recordValue, true, outputRecord, mergedTextIndexMap); + extraFieldsContainer.addChild(currentFieldsContainer); + jsonPath.removeLast(); + } + putExtrasField(_transformerConfig.getIndexableExtrasField(), _indexableExtrasFieldType, + extraFieldsContainer.getIndexableExtras(), outputRecord); + putExtrasField(_transformerConfig.getUnindexableExtrasField(), _unindexableExtrasFieldType, + extraFieldsContainer.getUnindexableExtras(), outputRecord); + + // Generate merged text index + if (null != _mergedTextIndexFieldSpec && !mergedTextIndexMap.isEmpty()) { + List<String> luceneTokens = getLuceneTokensFromMergedTextIndexMap(mergedTextIndexMap); + if (_mergedTextIndexFieldSpec.isSingleValueField()) { + outputRecord.putValue(_transformerConfig.getMergedTextIndexField(), String.join(" ", luceneTokens)); + } else { + outputRecord.putValue(_transformerConfig.getMergedTextIndexField(), luceneTokens); + } + } + } catch (Exception e) { + if (!_continueOnError) { + throw e; + } + _logger.error("Couldn't transform record: {}", record.toString(), e); + outputRecord.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true); + } + + return outputRecord; + } + + /** + * The class traverses the schema tree and the record accordingly to build the output record. + * Taking example: + * { + * "a": 1, + * "b": { + * "c": 2, + * "d": 3, + * "d_noIdx": 4 + * } + * "b_noIdx": { + * "c": 5, + * "d": 6, + * } + * } + * with column "a", "b", "b.c" in schema + * There are two types of output: + * - flattened keys with values, e.g., + * - keyPath as column and value as leaf node, e.g., "a": 1, "b.c": 2. However, "b" is not a leaf node, so it would + * be skipped + * - __mergedTestIdx storing ["1:a", "2:b.c", "3:b.d"] as a string array + * - structured Json format, e.g., + * - indexableFields/json_data: {"a": 1, "b": {"c": 2, "d": 3}} + * - unindexableFields/json_data_noIdx: {"b": {"d_noIdx": 4} ,"b_noIdx": {"c": 5, "d": 6}} + * Expected behavior: + * - If the current key is special, it would be added to the outputRecord and skip subtree + * - If the keyJsonPath is in fieldPathsToDrop, it and its subtree would be skipped + * - At leaf node (base case in recursion): + * - Parse keyPath and value and add as flattened result to outputRecord + * - Return structured fields as ExtraFieldsContainer + * - For non-leaf node + * - Construct ExtraFieldsContainer based on children's result and return + * + * @param parentNode The parent node in the schema tree which might or might not has a child with the given key. If + * parentNode is null, it means the current key is out of the schema tree. + * @param jsonPath The key json path split by "." + * @param value The value of the current field + * @param isIndexable Whether the current field is indexable + * @param outputRecord The output record updated during traverse + * @param mergedTextIndexMap The merged text index map updated during traverse + * @return ExtraFieldsContainer carries the indexable and unindexable fields of the current node as well as its + * subtree + */ + private ExtraFieldsContainer processField(SchemaTreeNode parentNode, Deque<String> jsonPath, Object value, + boolean isIndexable, GenericRow outputRecord, Map<String, Object> mergedTextIndexMap) { + // Common variables + boolean storeIndexableExtras = _transformerConfig.getIndexableExtrasField() != null; + boolean storeUnindexableExtras = _transformerConfig.getUnindexableExtrasField() != null; + String key = jsonPath.peekLast(); + ExtraFieldsContainer extraFieldsContainer = new ExtraFieldsContainer(storeUnindexableExtras); + + // Base case + if (StreamDataDecoderImpl.isSpecialKeyType(key) || GenericRow.isSpecialKeyType(key)) { + outputRecord.putValue(key, value); + return extraFieldsContainer; + } + + String keyJsonPath = String.join(".", jsonPath); + Set<String> fieldPathsToDrop = _transformerConfig.getFieldPathsToDrop(); + if (null != fieldPathsToDrop && fieldPathsToDrop.contains(keyJsonPath)) { + return extraFieldsContainer; + } + + SchemaTreeNode currentNode = parentNode == null ? null : parentNode.getChild(key); + String unindexableFieldSuffix = _transformerConfig.getUnindexableFieldSuffix(); + isIndexable = isIndexable && (null == unindexableFieldSuffix || !key.endsWith(unindexableFieldSuffix)); + if (!(value instanceof Map)) { + // leaf node + if (!isIndexable) { + extraFieldsContainer.addUnindexableEntry(key, value); + } else { + if (null != currentNode && currentNode.isColumn()) { + // In schema + outputRecord.putValue(currentNode.getColumnName(), currentNode.getValue(value)); + if (_transformerConfig.getFieldsToDoubleIngest().contains(keyJsonPath)) { + extraFieldsContainer.addIndexableEntry(key, value); + } + mergedTextIndexMap.put(keyJsonPath, value); + } else { + // Out of schema + if (storeIndexableExtras) { + extraFieldsContainer.addIndexableEntry(key, value); + mergedTextIndexMap.put(keyJsonPath, value); + } + } + } + return extraFieldsContainer; + } + // Traverse the subtree + Map<String, Object> valueAsMap = (Map<String, Object>) value; + for (Map.Entry<String, Object> entry : valueAsMap.entrySet()) { + jsonPath.addLast(entry.getKey()); + ExtraFieldsContainer childContainer = + processField(currentNode, jsonPath, entry.getValue(), isIndexable, outputRecord, mergedTextIndexMap); + extraFieldsContainer.addChild(key, childContainer); + jsonPath.removeLast(); + } + return extraFieldsContainer; + } + + /** + * Generate an index token based on the provided key-value pair. + * The index token follows this format: "val:key". + * @param kv used to generate text index tokens + * @param indexTokens a list to store the generated index tokens + * @param mergedTextIndexTokenMaxLength which we enforce via truncation during token generation + */ + public void generateTextIndexToken(Map.Entry<String, Object> kv, List<String> indexTokens, + Integer mergedTextIndexTokenMaxLength) { + String key = kv.getKey(); + String val; + // To avoid redundant leading and tailing '"', only convert to JSON string if the value is a list or an array + if (kv.getValue() instanceof Collection || kv.getValue() instanceof Object[]) { + try { + val = JsonUtils.objectToString(kv.getValue()); + } catch (JsonProcessingException e) { + val = kv.getValue().toString(); + } + } else { + val = kv.getValue().toString(); + } + + if (key.length() + 1 > MAXIMUM_LUCENE_TOKEN_SIZE) { + _logger.error("The provided key's length is too long, text index token cannot be truncated"); + return; + } + + // Truncate the value to ensure the generated index token is less or equal to mergedTextIndexTokenMaxLength + // The value length should be the mergedTextIndexTokenMaxLength minus ":" character (length 1) minus key length + int valueTruncationLength = mergedTextIndexTokenMaxLength - 1 - key.length(); + if (val.length() > valueTruncationLength) { + _realtimeMergedTextIndexTruncatedTokenSizeMeter = _serverMetrics + .addMeteredTableValue(_tableName, ServerMeter.REALTIME_MERGED_TEXT_IDX_TRUNCATED_TOKEN_SIZE, + key.length() + 1 + val.length(), _realtimeMergedTextIndexTruncatedTokenSizeMeter); + val = val.substring(0, valueTruncationLength); + } + + _mergedTextIndexTokenBytesCount += key.length() + 1 + val.length(); + _mergedTextIndexTokenCount += 1; + _serverMetrics.setValueOfTableGauge(_tableName, ServerGauge.REALTIME_MERGED_TEXT_IDX_TOKEN_AVG_LEN, + _mergedTextIndexTokenBytesCount / _mergedTextIndexTokenCount); + + indexTokens.add(val + ":" + key); + } + + /** + * Implement shingling for the merged text index based on the provided key-value pair. + * Each shingled index token retains the format of a standard index token: "val:key". However, "val" now denotes a + * sliding window of characters on the value. The total length of each shingled index token + * (key length + shingled value length + 1)must be less than or equal to shingleIndexMaxLength. The starting index + * of the sliding window for the value is increased by shinglingOverlapLength for every new shingled token. + * All shingle index tokens, except for the last one, should have the maximum possible length. If the minimum token + * length (shingling overlap length + key length + 1) exceeds the maximum Lucene token size + * (MAXIMUM_LUCENE_TOKEN_SIZE), shingling is disabled, and the value is truncated to match the maximum Lucene token + * size. If shingleIndexMaxLength is lower than the required minimum token length and also lower than the maximum + * Lucene token size, shingleIndexMaxLength is adjusted to match the maximum Lucene token size. + * + * Note that the most important parameter, the shingleIndexOverlapLength, is the maximum search length that will yield + * results with 100% accuracy. + * + * Example: key-> "key", value-> "0123456789ABCDEF", max length: 10, shingling overlap length: 3 + * Generated tokens: + * 012345:key + * 345678:key + * 6789AB:key + * 9ABCDE:key + * CDEF:key + * Any query with a length of 7 will yield no results, such as "0123456" or "6789ABC". + * Any query with a length of 3 will yield results with 100% accuracy (i.e. is always guaranteed to be searchable). + * Any query with a length between 4 and 6 (inclusive) has indeterminate accuracy. + * E.g. for queries with length 5, "12345", "789AB" will hit, while "23456" will miss. + * + * @param kv used to generate shingle text index tokens + * @param shingleIndexTokens a list to store the generated shingle index tokens + * @param shingleIndexMaxLength the maximum length of each shingle index token. Needs to be greater than the + * length of the key and shingleIndexOverlapLength + 1, and must be lower or equal + * to MAXIMUM_LUCENE_TOKEN_SIZE. + * @param shingleIndexOverlapLength the number of characters in the kv-pair's value shared by two adjacent shingle + * index tokens. If null, the overlap length will be defaulted to half of the max + * token length. + */ + public void generateShingleTextIndexToken(Map.Entry<String, Object> kv, List<String> shingleIndexTokens, + int shingleIndexMaxLength, int shingleIndexOverlapLength) { + String key = kv.getKey(); + String val; + // To avoid redundant leading and tailing '"', only convert to JSON string if the value is a list or an array + if (kv.getValue() instanceof Collection || kv.getValue() instanceof Object[]) { + try { + val = JsonUtils.objectToString(kv.getValue()); + } catch (JsonProcessingException e) { + val = kv.getValue().toString(); + } + } else { + val = kv.getValue().toString(); + } + final int valLength = val.length(); + final int tokenSuffixLength = key.length() + 1; + final int minTokenLength = tokenSuffixLength + shingleIndexOverlapLength + 1; + + if (shingleIndexOverlapLength >= valLength) { + if (_logger.isDebugEnabled()) { + _logger.warn("The shingleIndexOverlapLength " + shingleIndexOverlapLength + " is longer than the value length " + + valLength + ". Shingling will not be applied since only one token will be generated."); + } + generateTextIndexToken(kv, shingleIndexTokens, shingleIndexMaxLength); + return; + } + + if (minTokenLength > MAXIMUM_LUCENE_TOKEN_SIZE) { + _logger.debug("The minimum token length " + minTokenLength + " (" + MIN_TOKEN_LENGTH_DESCRIPTION + ") exceeds " + + "the limit of maximum Lucene token size " + MAXIMUM_LUCENE_TOKEN_SIZE + ". Value will be truncated and " + + "shingling will not be applied."); + generateTextIndexToken(kv, shingleIndexTokens, shingleIndexMaxLength); + return; + } + + // This logging becomes expensive if user accidentally sets a very low shingleIndexMaxLength + if (shingleIndexMaxLength < minTokenLength) { + _logger.debug("The shingleIndexMaxLength " + shingleIndexMaxLength + " is smaller than the minimum token length " + + minTokenLength + " (" + MIN_TOKEN_LENGTH_DESCRIPTION + "). Increasing the shingleIndexMaxLength to " + + "maximum Lucene token size " + MAXIMUM_LUCENE_TOKEN_SIZE + "."); + shingleIndexMaxLength = MAXIMUM_LUCENE_TOKEN_SIZE; + } + + // Shingle window slide length is the index position on the value which we shall advance on every iteration. + // We ensure shingleIndexMaxLength >= minTokenLength so that shingleWindowSlideLength >= 1. + int shingleWindowSlideLength = shingleIndexMaxLength - shingleIndexOverlapLength - tokenSuffixLength; + + // Generate shingle index tokens + // When starting_idx + shingleIndexOverlapLength >= valLength, there are no new characters to capture, then we stop + // the shingle token generation loop. + // We ensure that shingleIndexOverlapLength < valLength so that this loop will be entered at lease once. + for (int i = 0; i + shingleIndexOverlapLength < valLength; i += shingleWindowSlideLength) { + String tokenValStr = val.substring(i, Math.min(i + shingleIndexMaxLength - tokenSuffixLength, valLength)); + String shingleIndexToken = tokenValStr + ":" + key; + shingleIndexTokens.add(shingleIndexToken); + _mergedTextIndexTokenBytesCount += shingleIndexToken.length(); + ++_mergedTextIndexTokenCount; + } + _serverMetrics.setValueOfTableGauge(_tableName, ServerGauge.REALTIME_MERGED_TEXT_IDX_TOKEN_AVG_LEN, + _mergedTextIndexTokenBytesCount / _mergedTextIndexTokenCount); + } + + /** + * Converts (if necessary) and adds the given extras field to the output record + */ + private void putExtrasField(String fieldName, DataType fieldType, Map<String, Object> field, + GenericRow outputRecord) { + if (null == field) { + return; + } + + switch (fieldType) { + case JSON: + outputRecord.putValue(fieldName, field); + break; + case STRING: + try { + outputRecord.putValue(fieldName, JsonUtils.objectToString(field)); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to convert '" + fieldName + "' to string", e); + } + break; + default: + throw new UnsupportedOperationException("Cannot convert '" + fieldName + "' to " + fieldType.name()); + } + } + + private List<String> getLuceneTokensFromMergedTextIndexMap(Map<String, Object> mergedTextIndexMap) { + final Integer mergedTextIndexTokenMaxLength = _transformerConfig.getMergedTextIndexTokenMaxLength(); + final @Nullable + Integer mergedTextIndexShinglingOverlapLength = _transformerConfig.getMergedTextIndexShinglingOverlapLength(); + List<String> luceneTokens = new ArrayList<>(); + mergedTextIndexMap.entrySet().stream().filter(kv -> null != kv.getKey() && null != kv.getValue()) + .filter(kv -> !_transformerConfig.getMergedTextIndexPathToExclude().contains(kv.getKey())).filter( + kv -> !base64ValueFilter(kv.getValue().toString().getBytes(), + _transformerConfig.getMergedTextIndexBinaryTokenDetectionMinLength())).filter( + kv -> _transformerConfig.getMergedTextIndexSuffixToExclude().stream() + .anyMatch(suffix -> !kv.getKey().endsWith(suffix))).forEach(kv -> { + if (null == mergedTextIndexShinglingOverlapLength) { + generateTextIndexToken(kv, luceneTokens, mergedTextIndexTokenMaxLength); + } else { + generateShingleTextIndexToken(kv, luceneTokens, mergedTextIndexTokenMaxLength, + mergedTextIndexShinglingOverlapLength); + } + }); + return luceneTokens; + } +} + +/** + * SchemaTreeNode represents the tree node when we construct the schema tree. The node could be either leaf node or + * non-leaf node. Both types of node could hold the volumn as a column in the schema. + * For example, the schema with columns a, b, c, d.e, d.f, x.y, x.y.z, x.y.w will have the following tree structure: + * root -- a* + * -- b* + * -- c* + * -- d -- e* + * -- f* + * -- x* -- y* -- z* + * -- w* + * where node with "*" could represent a valid column in the schema. + */ +class SchemaTreeNode { + private boolean _isColumn; Review Comment: These 2 terms are both used in Pinot documents, so I assume both as acceptable names. Keep them as is for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
