This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new aaf65b1aab8 Remove unnecessary string conversion in
SanitizationTransformer (#17778)
aaf65b1aab8 is described below
commit aaf65b1aab8a2a642d9ff2c8b33e245b41ec0d06
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Feb 27 13:31:00 2026 -0800
Remove unnecessary string conversion in SanitizationTransformer (#17778)
---
.../recordtransformer/SanitizationTransformer.java | 49 +++++++----------
.../recordtransformer/RecordTransformerTest.java | 63 ++++++++++++----------
2 files changed, 54 insertions(+), 58 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
index 640975a2e08..2c0abae691e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
@@ -31,25 +31,23 @@ import
org.apache.pinot.spi.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.utils.StringUtil;
-/**
- * The {@code SanitizationTransformer} class will sanitize the values to
follow certain rules including:
- * <ul>
- * <li>No {@code null} characters in string values</li>
- * <li>String values are within the length limit</li>
- * </ul>
- * <p>NOTE: should put this after the {@link DataTypeTransformer} so that all
values follow the data types in
- * {@link FieldSpec}.
- * This uses the MaxLengthExceedStrategy in the {@link FieldSpec} to decide
what to do when the value exceeds the max.
- * For TRIM_LENGTH, the value is trimmed to the max length.
- * For SUBSTITUTE_DEFAULT_VALUE, the value is replaced with the default null
value string.
- * For ERROR, an exception is thrown and the record is skipped.
- * For NO_ACTION, the value is kept as is if no NULL_CHARACTER present else
trimmed till NULL.
- * In the first 2 scenarios, this metric REALTIME_ROWS_SANITIZED can be
tracked to know if a trimmed /
- * default record was persisted.
- * In the third scenario, this metric ROWS_WITH_ERRORS can be tracked to know
if a record was skipped.
- * In the last scenario, this metric REALTIME_ROWS_SANITIZED can be tracked to
know if a record was trimmed
- * due to having a null character.
- */
+/// The `SanitizationTransformer` class will sanitize the values to follow
certain rules including:
+/// - No `null` characters in string values
+/// - String/bytes values are within the length limit
+///
+/// NOTE: should put this after the [DataTypeTransformer] so that all values
follow the data types in [FieldSpec].
+/// This uses the `MaxLengthExceedStrategy` in the [FieldSpec] to decide what
to do when the value exceeds the max
+/// length:
+/// - TRIM_LENGTH: Trim value to the max length
+/// - SUBSTITUTE_DEFAULT_VALUE: Replace value with the default null value
+/// - ERROR: Throw exception when value doesn't conform with the rules
+/// - NO_ACTION: Keep the value as is if no `NULL_CHARACTER` presents, else
trim till `NULL_CHARACTER`
+///
+/// In the first 2 scenarios, this metric `REALTIME_ROWS_SANITIZED` can be
tracked to know if a trimmed / default record
+/// was persisted.
+/// In the third scenario, this metric `ROWS_WITH_ERRORS` can be tracked to
know if a record was skipped.
+/// In the last scenario, this metric `REALTIME_ROWS_SANITIZED` can be tracked
to know if a record was trimmed due to
+/// having a `NULL_CHARACTER`.
public class SanitizationTransformer implements RecordTransformer {
private static final String NULL_CHARACTER = "\0";
private final Map<String, SanitizedColumnInfo> _columnToColumnInfoMap = new
HashMap<>();
@@ -62,8 +60,7 @@ public class SanitizationTransformer implements
RecordTransformer {
MaxLengthExceedStrategy strategy =
fieldSpec.getEffectiveMaxLengthExceedStrategy();
if (dataType == DataType.STRING || strategy !=
MaxLengthExceedStrategy.NO_ACTION) {
_columnToColumnInfoMap.put(fieldSpec.getName(),
- new SanitizedColumnInfo(fieldSpec.getName(),
fieldSpec.getEffectiveMaxLength(), strategy,
- fieldSpec.getDefaultNullValue()));
+ new SanitizedColumnInfo(fieldSpec.getEffectiveMaxLength(),
strategy, fieldSpec.getDefaultNullValue()));
}
}
}
@@ -132,7 +129,7 @@ public class SanitizationTransformer implements
RecordTransformer {
case TRIM_LENGTH:
return Pair.of(sanitizedValue, true);
case SUBSTITUTE_DEFAULT_VALUE:
- return
Pair.of(FieldSpec.getStringValue(sanitizedColumnInfo.getDefaultNullValue()),
true);
+ return Pair.of((String) sanitizedColumnInfo.getDefaultNullValue(),
true);
case ERROR:
index = value.indexOf(NULL_CHARACTER);
if (index < 0) {
@@ -190,23 +187,17 @@ public class SanitizationTransformer implements
RecordTransformer {
}
private static class SanitizedColumnInfo {
- private final String _columnName;
private final int _maxLength;
private final MaxLengthExceedStrategy _maxLengthExceedStrategy;
private final Object _defaultNullValue;
- private SanitizedColumnInfo(String columnName, int maxLength,
MaxLengthExceedStrategy maxLengthExceedStrategy,
+ private SanitizedColumnInfo(int maxLength, MaxLengthExceedStrategy
maxLengthExceedStrategy,
Object defaultNullValue) {
- _columnName = columnName;
_maxLength = maxLength;
_maxLengthExceedStrategy = maxLengthExceedStrategy;
_defaultNullValue = defaultNullValue;
}
- public String getColumnName() {
- return _columnName;
- }
-
public int getMaxLength() {
return _maxLength;
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
index a790883bce0..0cc0def4856 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
@@ -42,7 +42,6 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
-import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
@@ -129,6 +128,13 @@ public class RecordTransformerTest {
return record;
}
+ private static GenericRow getTypeConformingRecord() {
+ DataTypeTransformer dataTypeTransformer = new
DataTypeTransformer(TABLE_CONFIG, SCHEMA);
+ GenericRow record = getRecord();
+ dataTypeTransformer.transform(record);
+ return record;
+ }
+
@Test
public void testFilterTransformer() {
IngestionConfig ingestionConfig = new IngestionConfig();
@@ -255,7 +261,7 @@ public class RecordTransformerTest {
ingestionConfig.setRowTimeValueCheck(true);
tableConfig.setIngestionConfig(ingestionConfig);
RecordTransformer transformerWithValidation = new
TimeValidationTransformer(tableConfig, schema);
- GenericRow record1 = getRecord();
+ GenericRow record1 = getTypeConformingRecord();
record1.putValue(timeCol, 1L);
for (int i = 0; i < NUM_ROUNDS; i++) {
assertThrows(() -> transformerWithValidation.transform(record1));
@@ -264,7 +270,7 @@ public class RecordTransformerTest {
// Invalid timestamp, validation enabled and ignoreErrors enabled
ingestionConfig.setContinueOnError(true);
transformer = new TimeValidationTransformer(tableConfig, schema);
- GenericRow record2 = getRecord();
+ GenericRow record2 = getTypeConformingRecord();
record2.putValue(timeCol, 1L);
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record2);
@@ -274,7 +280,7 @@ public class RecordTransformerTest {
// Valid timestamp, validation enabled
ingestionConfig.setContinueOnError(false);
transformer = new TimeValidationTransformer(tableConfig, schema);
- GenericRow record3 = getRecord();
+ GenericRow record3 = getTypeConformingRecord();
Long currentTimeMillis = System.currentTimeMillis();
record3.putValue(timeCol, currentTimeMillis);
for (int i = 0; i < NUM_ROUNDS; i++) {
@@ -288,7 +294,7 @@ public class RecordTransformerTest {
// scenario where string contains null and exceeds max length
// and fieldSpec maxLengthExceedStrategy is default (TRIM_LENGTH)
RecordTransformer transformer = new SanitizationTransformer(SCHEMA);
- GenericRow record = getRecord();
+ GenericRow record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
assertEquals(record.getValue("svStringWithNullCharacters"), "1");
@@ -309,7 +315,7 @@ public class RecordTransformerTest {
FieldSpec svStringWithNullCharacters =
schema.getFieldSpecFor("svStringWithNullCharacters");
svStringWithNullCharacters.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.ERROR);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
try {
transformer.transform(record);
@@ -323,7 +329,7 @@ public class RecordTransformerTest {
// scenario where string contains null and fieldSpec
maxLengthExceedStrategy is to SUBSTITUTE_DEFAULT_VALUE
svStringWithNullCharacters.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
assertEquals(record.getValue("svStringWithNullCharacters"), "null");
@@ -333,7 +339,7 @@ public class RecordTransformerTest {
// scenario where string contains null and fieldSpec
maxLengthExceedStrategy is to NO_ACTION
svStringWithNullCharacters.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.NO_ACTION);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
assertEquals(record.getValue("svStringWithNullCharacters"), "1");
@@ -347,7 +353,7 @@ public class RecordTransformerTest {
// scenario where string exceeds max length and fieldSpec
maxLengthExceedStrategy is to ERROR
svStringWithLengthLimit.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.ERROR);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
try {
transformer.transform(record);
@@ -361,7 +367,7 @@ public class RecordTransformerTest {
// scenario where string exceeds max length and fieldSpec
maxLengthExceedStrategy is to SUBSTITUTE_DEFAULT_VALUE
svStringWithLengthLimit.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
assertEquals(record.getValue("svStringWithLengthLimit"), "null");
@@ -371,7 +377,7 @@ public class RecordTransformerTest {
// scenario where string exceeds max length and fieldSpec
maxLengthExceedStrategy is to NO_ACTION
svStringWithLengthLimit.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.NO_ACTION);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
assertEquals(record.getValue("svStringWithLengthLimit"), "123");
@@ -386,27 +392,27 @@ public class RecordTransformerTest {
svJson.setMaxLength(10);
svJson.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.NO_ACTION);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
- assertEquals(record.getValue("svJson"), "{\"first\": \"daffy\",
\"last\": \"duck\"}");
+ assertEquals(record.getValue("svJson"),
"{\"first\":\"daffy\",\"last\":\"duck\"}");
assertFalse(record.isSanitized());
}
// scenario where json field exceeds max length and fieldSpec
maxLengthExceedStrategy is to TRIM_LENGTH
svJson.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.TRIM_LENGTH);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
- assertEquals(record.getValue("svJson"), "{\"first\": ");
+ assertEquals(record.getValue("svJson"), "{\"first\":\"");
assertTrue(record.isSanitized());
}
// scenario where json field exceeds max length and fieldSpec
maxLengthExceedStrategy is to SUBSTITUTE_DEFAULT_VALUE
svJson.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
assertEquals(record.getValue("svJson"), "null");
@@ -416,14 +422,14 @@ public class RecordTransformerTest {
// scenario where json field exceeds max length and fieldSpec
maxLengthExceedStrategy is to ERROR
svJson.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.ERROR);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
try {
transformer.transform(record);
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(),
- "Throwing exception as value: {\"first\": \"daffy\", \"last\":
\"duck\"} for column svJson exceeds "
+ "Throwing exception as value:
{\"first\":\"daffy\",\"last\":\"duck\"} for column svJson exceeds "
+ "configured max length 10.");
}
}
@@ -433,23 +439,23 @@ public class RecordTransformerTest {
// scenario where bytes field exceeds max length and fieldSpec
maxLengthExceedStrategy is to NO_ACTION
FieldSpec svBytes = schema.getFieldSpecFor("svBytes");
- svBytes.setMaxLength(2);
+ svBytes.setMaxLength(1);
svBytes.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.NO_ACTION);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
- assertEquals(record.getValue("svBytes"), "7b7b");
+ assertEquals(record.getValue("svBytes"), new byte[]{123, 123});
assertFalse(record.isSanitized());
}
// scenario where bytes field exceeds max length and fieldSpec
maxLengthExceedStrategy is to TRIM_LENGTH
svBytes.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.TRIM_LENGTH);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
- assertEquals(record.getValue("svBytes"), "7b");
+ assertEquals(record.getValue("svBytes"), new byte[]{123});
assertTrue(record.isSanitized());
}
@@ -457,24 +463,23 @@ public class RecordTransformerTest {
// SUBSTITUTE_DEFAULT_VALUE
svBytes.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
- assertEquals(record.getValue("svBytes"), BytesUtils.toHexString(new
byte[0]));
+ assertEquals(record.getValue("svBytes"), new byte[0]);
assertTrue(record.isSanitized());
}
// scenario where bytes field exceeds max length and fieldSpec
maxLengthExceedStrategy is to ERROR
svBytes.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.ERROR);
transformer = new SanitizationTransformer(schema);
- record = getRecord();
+ record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
try {
transformer.transform(record);
fail();
} catch (IllegalStateException e) {
- assertEquals(e.getMessage(),
- "Throwing exception as value: 7b7b for column svBytes exceeds
configured max length 2.");
+ assertEquals(e.getMessage(), "Throwing exception as value for column
svBytes exceeds configured max length 1.");
}
}
}
@@ -482,7 +487,7 @@ public class RecordTransformerTest {
@Test
public void testSpecialValueTransformer() {
RecordTransformer transformer = new SpecialValueTransformer(SCHEMA);
- GenericRow record = getRecord();
+ GenericRow record = getTypeConformingRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
transformer.transform(record);
assertEquals(Float.floatToRawIntBits((float)
record.getValue("svFloatNegativeZero")),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]