This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d78b39b6ca8 [ingestion] Add rate-limited exception logging for
transformers (#17239)
d78b39b6ca8 is described below
commit d78b39b6ca850d9aa2d50f4c47a9a92a1c8a4745
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Tue Nov 25 13:37:39 2025 -0800
[ingestion] Add rate-limited exception logging for transformers (#17239)
---
.../org/apache/pinot/common/utils/RateLimiter.java | 93 +++++++++++++
.../apache/pinot/common/utils/ThrottledLogger.java | 97 +++++++++++++
.../apache/pinot/common/utils/RateLimiterTest.java | 148 ++++++++++++++++++++
.../pinot/common/utils/ThrottledLoggerTest.java | 150 +++++++++++++++++++++
.../recordtransformer/ComplexTypeTransformer.java | 8 +-
.../recordtransformer/DataTypeTransformer.java | 5 +-
.../recordtransformer/ExpressionTransformer.java | 5 +-
.../local/recordtransformer/FilterTransformer.java | 10 +-
.../SchemaConformingTransformer.java | 5 +-
.../TimeValidationTransformer.java | 7 +-
.../config/table/ingestion/IngestionConfig.java | 13 ++
.../apache/pinot/spi/utils/CommonConstants.java | 4 +
12 files changed, 534 insertions(+), 11 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/RateLimiter.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/RateLimiter.java
new file mode 100644
index 00000000000..f43ddcf7033
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/RateLimiter.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.function.LongSupplier;
+import javax.annotation.concurrent.NotThreadSafe;
+
+
+/**
+ * A highly performant, non-thread-safe Token Bucket implementation.
+ * It tracks its own dropped count to avoid wrapper object allocations.
+ */
+@NotThreadSafe
+class RateLimiter {
+ private final double _capacity;
+ private final double _tokensPerNano;
+ private final LongSupplier _nanoTimeSupplier;
+
+ private double _tokens;
+ private long _lastRefillNano;
+ private long _droppedCount;
+
+ RateLimiter(double rateLimitPerMin) {
+ this(rateLimitPerMin, System::nanoTime);
+ }
+
+ @VisibleForTesting
+ RateLimiter(double rateLimitPerMin, LongSupplier nanoTimeSupplier) {
+ _capacity = rateLimitPerMin;
+ _tokens = rateLimitPerMin;
+ _tokensPerNano = rateLimitPerMin / 60_000_000_000.0;
+ _nanoTimeSupplier = nanoTimeSupplier;
+ _lastRefillNano = nanoTimeSupplier.getAsLong();
+ _droppedCount = 0;
+ }
+
+ /**
+ * Attempts to consume a token.
+ * Refills tokens based on elapsed time.
+ * @return true if token acquired, false if limit exceeded (and increments
dropped count)
+ */
+ boolean tryAcquire() {
+ long now = _nanoTimeSupplier.getAsLong();
+
+ // 1. Refill
+ long delta = now - _lastRefillNano;
+ if (delta > 0) {
+ double newTokens = delta * _tokensPerNano;
+ _tokens = Math.min(_capacity, _tokens + newTokens);
+ _lastRefillNano = now;
+ }
+
+ // 2. Acquire
+ if (_tokens >= 1.0) {
+ _tokens -= 1.0;
+ return true;
+ }
+
+ // 3. Reject & Track
+ _droppedCount++;
+ return false;
+ }
+
+ /**
+ * Returns the number of logs dropped since the last successful acquire,
+ * and resets the counter to zero.
+ */
+ long getAndResetDroppedCount() {
+ if (_droppedCount == 0) {
+ return 0;
+ }
+ long count = _droppedCount;
+ _droppedCount = 0;
+ return count;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ThrottledLogger.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ThrottledLogger.java
new file mode 100644
index 00000000000..7c462084afd
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ThrottledLogger.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.slf4j.Logger;
+
+
+/**
+ * Rate-limited exception logger that prevents log flooding while maintaining
visibility into errors.
+ *
+ * <p><b>Key Features:</b>
+ * <ul>
+ * <li>Class-based fingerprinting: Each exception class gets its own rate
limiter</li>
+ * <li>Suppression tracking: Reports count of dropped logs when rate limit is
lifted</li>
+ * <li>Zero Dependencies: Uses a lightweight in-house Token Bucket
implementation</li>
+ * </ul>
+ *
+ * <p><b>Note:</b> This class is designed for single-threaded access per
instance.
+ */
+@NotThreadSafe
+public class ThrottledLogger {
+ private final Logger _delegate;
+ private final Map<Class<?>, RateLimiter> _rateLimiterMap = new HashMap<>();
+ private final double _rateLimitPerMin;
+
+ public ThrottledLogger(Logger delegate, @Nullable IngestionConfig
ingestionConfig) {
+ this(delegate, getRateLimitPerMin(ingestionConfig));
+ }
+
+ public ThrottledLogger(Logger delegate, double rateLimitPerMin) {
+ _delegate = delegate;
+ _rateLimitPerMin = rateLimitPerMin;
+ }
+
+ private static double getRateLimitPerMin(@Nullable IngestionConfig
ingestionConfig) {
+ if (ingestionConfig == null) {
+ return 0;
+ }
+ return ingestionConfig.getIngestionExceptionLogRateLimitPerMin();
+ }
+
+ public void warn(String msg, Throwable t) {
+ logWithRateLimit(msg, t, _delegate::warn);
+ }
+
+ public void error(String msg, Throwable t) {
+ logWithRateLimit(msg, t, _delegate::error);
+ }
+
+ private void logWithRateLimit(String msg, Throwable t, BiConsumer<String,
Throwable> consumer) {
+ if (_rateLimitPerMin <= 0) {
+ _delegate.debug(msg, t);
+ return;
+ }
+
+ Class<?> exceptionClass = t.getClass();
+ RateLimiter limiter = _rateLimiterMap.computeIfAbsent(exceptionClass, k ->
new RateLimiter(_rateLimitPerMin));
+
+ // tryAcquire now automatically handles incrementing the drop count if it
returns false
+ if (limiter.tryAcquire()) {
+ // If we successfully acquired, check if we had previously dropped logs
+ long droppedCount = limiter.getAndResetDroppedCount();
+ if (droppedCount > 0) {
+ consumer.accept(String.format("Dropped %d occurrences of %s",
+ droppedCount,
+ exceptionClass.getSimpleName()), null);
+ }
+ consumer.accept(msg, t);
+ } else {
+ // If acquire failed, the limiter has already incremented its internal
dropped counter.
+ // We still debug log so the data isn't completely lost.
+ _delegate.debug(msg, t);
+ }
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/RateLimiterTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/RateLimiterTest.java
new file mode 100644
index 00000000000..e2d99d39d5c
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/RateLimiterTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.function.LongSupplier;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class RateLimiterTest {
+
+ /**
+ * Helper class to control time in tests for precise time-based assertions.
+ */
+ private static class ControllableTimeSource implements LongSupplier {
+ private long _currentNanos;
+
+ ControllableTimeSource(long initialNanos) {
+ _currentNanos = initialNanos;
+ }
+
+ @Override
+ public long getAsLong() {
+ return _currentNanos;
+ }
+
+ public void advanceSeconds(long seconds) {
+ _currentNanos += seconds * 1_000_000_000L;
+ }
+ }
+
+ @Test
+ public void testRateLimiterInitialCapacity() {
+ ControllableTimeSource timeSource = new ControllableTimeSource(0);
+ RateLimiter limiter = new RateLimiter(5.0, timeSource);
+
+ boolean[] results = new boolean[6];
+ for (int i = 0; i < 6; i++) {
+ results[i] = limiter.tryAcquire();
+ }
+
+ assertThat(results[0]).describedAs("First acquire should
succeed").isTrue();
+ assertThat(results[1]).describedAs("Second acquire should
succeed").isTrue();
+ assertThat(results[2]).describedAs("Third acquire should
succeed").isTrue();
+ assertThat(results[3]).describedAs("Fourth acquire should
succeed").isTrue();
+ assertThat(results[4]).describedAs("Fifth acquire should
succeed").isTrue();
+ assertThat(results[5]).describedAs("Sixth acquire should fail").isFalse();
+ }
+
+ @Test
+ public void testRateLimiterRejectIncrementsDropCount() {
+ ControllableTimeSource timeSource = new ControllableTimeSource(0);
+ RateLimiter limiter = new RateLimiter(2.0, timeSource);
+
+ limiter.tryAcquire();
+ limiter.tryAcquire();
+
+ for (int i = 0; i < 10; i++) {
+ boolean result = limiter.tryAcquire();
+ assertThat(result).describedAs("Acquire should fail when tokens
exhausted").isFalse();
+ }
+
+ long droppedCount = limiter.getAndResetDroppedCount();
+ assertThat(droppedCount).isEqualTo(10);
+ }
+
+ @Test
+ public void testRateLimiterDropCountResets() {
+ ControllableTimeSource timeSource = new ControllableTimeSource(0);
+ RateLimiter limiter = new RateLimiter(5.0, timeSource);
+
+ for (int i = 0; i < 5; i++) {
+ limiter.tryAcquire();
+ }
+
+ for (int i = 0; i < 100; i++) {
+ assertThat(limiter.tryAcquire()).isFalse();
+ }
+
+ long droppedBeforeRefill = limiter.getAndResetDroppedCount();
+ assertThat(droppedBeforeRefill).isEqualTo(100);
+
+ long secondCall = limiter.getAndResetDroppedCount();
+ assertThat(secondCall).isEqualTo(0);
+
+ timeSource.advanceSeconds(60);
+ for (int i = 0; i < 5; i++) {
+ assertThat(limiter.tryAcquire()).isTrue();
+ }
+ assertThat(limiter.tryAcquire()).isFalse();
+
+ long droppedAfterRefill = limiter.getAndResetDroppedCount();
+ assertThat(droppedAfterRefill).isEqualTo(1);
+ }
+
+ @Test
+ public void testRateLimiterRefillsTokensOverTime() {
+ ControllableTimeSource timeSource = new ControllableTimeSource(0);
+ RateLimiter limiter = new RateLimiter(10.0, timeSource);
+
+ for (int i = 0; i < 10; i++) {
+ assertThat(limiter.tryAcquire()).isTrue();
+ }
+ assertThat(limiter.tryAcquire()).isFalse();
+
+ timeSource.advanceSeconds(6);
+ assertThat(limiter.tryAcquire()).isTrue();
+
+ timeSource.advanceSeconds(120);
+ for (int i = 0; i < 10; i++) {
+ assertThat(limiter.tryAcquire()).isTrue();
+ }
+ assertThat(limiter.tryAcquire()).isFalse();
+ }
+
+ @Test
+ public void testRateLimiterZeroTimeDelta() {
+ ControllableTimeSource timeSource = new ControllableTimeSource(0);
+ RateLimiter limiter = new RateLimiter(10.0, timeSource);
+
+ int successCount = 0;
+ for (int i = 0; i < 20; i++) {
+ if (limiter.tryAcquire()) {
+ successCount++;
+ }
+ }
+
+ assertThat(successCount).isEqualTo(10);
+ assertThat(limiter.getAndResetDroppedCount()).isEqualTo(10);
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/ThrottledLoggerTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/ThrottledLoggerTest.java
new file mode 100644
index 00000000000..1e3ad3c2880
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/ThrottledLoggerTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.io.IOException;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+
+
+public class ThrottledLoggerTest {
+
+ @Test
+ public void testIndependentRateLimitingPerExceptionClass() {
+ Logger mockLogger = mock(Logger.class);
+ ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, 2.0 /
60.0);
+
+ for (int i = 0; i < 100; i++) {
+ throttledLogger.warn("Error", new NumberFormatException("Invalid
number"));
+ }
+ for (int i = 0; i < 100; i++) {
+ throttledLogger.warn("Error", new IllegalArgumentException("Invalid
argument"));
+ }
+ for (int i = 0; i < 100; i++) {
+ throttledLogger.error("Error", new IOException("IO error"));
+ }
+
+ verify(mockLogger, atMost(3)).warn(eq("Error"),
any(NumberFormatException.class));
+ verify(mockLogger, atMost(3)).warn(eq("Error"),
any(IllegalArgumentException.class));
+ verify(mockLogger, atMost(3)).error(eq("Error"), any(IOException.class));
+ }
+
+ @Test
+ public void testDefaultIngestionConfigDisablesThrottling() {
+ Logger mockLogger = mock(Logger.class);
+ ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, null);
+
+ for (int i = 0; i < 100; i++) {
+ throttledLogger.warn("Error", new NumberFormatException("Invalid
number"));
+ }
+
+ verify(mockLogger, never()).warn(anyString(), any(Throwable.class));
+ verify(mockLogger, times(100)).debug(eq("Error"),
any(NumberFormatException.class));
+ }
+
+ @Test
+ public void testExplicitZeroRateLimitDisablesThrottling() {
+ Logger mockLogger = mock(Logger.class);
+ IngestionConfig config = new IngestionConfig();
+ config.setIngestionExceptionLogRateLimitPerMin(0);
+ ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, config);
+
+ for (int i = 0; i < 100; i++) {
+ throttledLogger.warn("Error", new NumberFormatException("Invalid
number"));
+ }
+
+ verify(mockLogger, never()).warn(anyString(), any(Throwable.class));
+ verify(mockLogger, times(100)).debug(eq("Error"),
any(NumberFormatException.class));
+ }
+
+ @Test
+ public void testDroppedCountReportedWithCorrectMessage() {
+ Logger mockLogger = mock(Logger.class);
+ ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, 1.0);
+
+ NumberFormatException exception = new NumberFormatException("Test");
+ for (int i = 0; i < 10; i++) {
+ throttledLogger.warn("Error occurred", exception);
+ }
+
+ verify(mockLogger, times(1)).warn(eq("Error occurred"), eq(exception));
+ verify(mockLogger, times(9)).debug(eq("Error occurred"), eq(exception));
+
+ IllegalArgumentException differentException = new
IllegalArgumentException("Different");
+ throttledLogger.warn("Different error", differentException);
+
+ verify(mockLogger, times(1)).warn(eq("Different error"),
eq(differentException));
+ }
+
+ @Test
+ public void testDroppedCountIndependentPerExceptionClass() {
+ Logger mockLogger = mock(Logger.class);
+ ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, 2.0);
+
+ NumberFormatException exceptionA = new NumberFormatException("Test A");
+ for (int i = 0; i < 10; i++) {
+ throttledLogger.warn("Error A", exceptionA);
+ }
+
+ IllegalArgumentException exceptionB = new IllegalArgumentException("Test
B");
+ for (int i = 0; i < 5; i++) {
+ throttledLogger.error("Error B", exceptionB);
+ }
+
+ verify(mockLogger, times(2)).warn(eq("Error A"), eq(exceptionA));
+ verify(mockLogger, times(8)).debug(eq("Error A"), eq(exceptionA));
+
+ verify(mockLogger, times(2)).error(eq("Error B"), eq(exceptionB));
+ verify(mockLogger, times(3)).debug(eq("Error B"), eq(exceptionB));
+
+ verify(mockLogger, times(2)).warn(anyString(),
any(NumberFormatException.class));
+ verify(mockLogger, times(2)).error(anyString(),
any(IllegalArgumentException.class));
+ }
+
+ @Test
+ public void testNoSuppressionMessageWhenZeroDrops() {
+ Logger mockLogger = mock(Logger.class);
+ ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, 10.0);
+
+ NumberFormatException exception = new NumberFormatException("Test");
+ throttledLogger.warn("First log", exception);
+
+ verify(mockLogger, times(1)).warn(eq("First log"), eq(exception));
+ verify(mockLogger, never()).debug(anyString(), any(Throwable.class));
+
+ throttledLogger.warn("Second log", exception);
+
+ verify(mockLogger, times(1)).warn(eq("Second log"), eq(exception));
+
+ ArgumentCaptor<String> messageCaptor =
ArgumentCaptor.forClass(String.class);
+ verify(mockLogger, atLeast(0)).warn(messageCaptor.capture(),
any(Throwable.class));
+
+ for (String message : messageCaptor.getAllValues()) {
+ assertThat(message).doesNotContain("Dropped");
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
index 0c4d7cfba88..8d251c0d084 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
@@ -29,6 +29,7 @@ import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.function.scalar.JsonFunctions;
+import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import
org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig.CollectionNotUnnestedToJson;
@@ -98,6 +99,7 @@ public class ComplexTypeTransformer implements
RecordTransformer {
private final CollectionNotUnnestedToJson _collectionNotUnnestedToJson;
private final Map<String, String> _prefixesToRename;
private final boolean _continueOnError;
+ private final ThrottledLogger _throttledLogger;
private final List<String> _fieldsToUnnestAndKeepOriginalValue;
private ComplexTypeTransformer(TableConfig tableConfig) {
@@ -122,6 +124,7 @@ public class ComplexTypeTransformer implements
RecordTransformer {
Objects.requireNonNullElse(complexTypeConfig.getCollectionNotUnnestedToJson(),
DEFAULT_COLLECTION_TO_JSON_MODE);
_prefixesToRename =
Objects.requireNonNullElse(complexTypeConfig.getPrefixesToRename(), Map.of());
_continueOnError = ingestionConfig.isContinueOnError();
+ _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
}
/// Returns a [ComplexTypeTransformer] if it is defined in the table config,
`null` otherwise.
@@ -143,6 +146,7 @@ public class ComplexTypeTransformer implements
RecordTransformer {
_collectionNotUnnestedToJson = collectionNotUnnestedToJson;
_prefixesToRename = prefixesToRename;
_continueOnError = continueOnError;
+ _throttledLogger = new ThrottledLogger(LOGGER, 0.0);
}
@VisibleForTesting
@@ -233,7 +237,7 @@ public class ComplexTypeTransformer implements
RecordTransformer {
if (!_continueOnError) {
throw new RuntimeException("Caught exception while transforming
complex types", e);
}
- LOGGER.debug("Caught exception while transforming complex types for
record: {}", record.toString(), e);
+ _throttledLogger.warn("Caught exception while transforming complex
types", e);
record.markIncomplete();
}
}
@@ -245,7 +249,7 @@ public class ComplexTypeTransformer implements
RecordTransformer {
if (!_continueOnError) {
throw new RuntimeException("Caught exception while renaming
prefixes", e);
}
- LOGGER.debug("Caught exception while renaming prefixes for record:
{}", record.toString(), e);
+ _throttledLogger.warn("Caught exception while renaming prefixes", e);
record.markIncomplete();
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
index 1a69069945e..1c91136602e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
@@ -50,6 +51,7 @@ public class DataTypeTransformer implements RecordTransformer
{
private final Map<String, PinotDataType> _dataTypes = new HashMap<>();
private final boolean _continueOnError;
+ private final ThrottledLogger _throttledLogger;
public DataTypeTransformer(TableConfig tableConfig, Schema schema) {
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
@@ -59,6 +61,7 @@ public class DataTypeTransformer implements RecordTransformer
{
}
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
_continueOnError = ingestionConfig != null &&
ingestionConfig.isContinueOnError();
+ _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
}
@Override
@@ -121,7 +124,7 @@ public class DataTypeTransformer implements
RecordTransformer {
if (!_continueOnError) {
throw new RuntimeException("Caught exception while transforming data
type for column: " + column, e);
}
- LOGGER.debug("Caught exception while transforming data type for
column: {}", column, e);
+ _throttledLogger.warn("Caught exception while transforming data type
for column: " + column, e);
record.putValue(column, null);
record.markIncomplete();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
index c9bfe192f77..7c62372f409 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -52,6 +53,7 @@ public class ExpressionTransformer implements
RecordTransformer {
@VisibleForTesting
final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new
LinkedHashMap<>();
private final boolean _continueOnError;
+ private final ThrottledLogger _throttledLogger;
public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
@@ -88,6 +90,7 @@ public class ExpressionTransformer implements
RecordTransformer {
}
_continueOnError = ingestionConfig != null &&
ingestionConfig.isContinueOnError();
+ _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
}
private void topologicalSort(String column, Map<String, FunctionEvaluator>
expressionEvaluators,
@@ -148,7 +151,7 @@ public class ExpressionTransformer implements
RecordTransformer {
if (!_continueOnError) {
throw new RuntimeException("Caught exception while evaluation
transform function for column: " + column, e);
}
- LOGGER.debug("Caught exception while evaluation transform function
for column: {}", column, e);
+ _throttledLogger.warn("Caught exception while evaluation transform
function for column: " + column, e);
record.markIncomplete();
}
} else if (existingValue.getClass().isArray() || existingValue
instanceof Collections
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java
index 79915c2b894..ebf1cd72ca0 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.recordtransformer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -40,6 +41,7 @@ public class FilterTransformer implements RecordTransformer {
private final String _filterFunction;
private final FunctionEvaluator _evaluator;
private final boolean _continueOnError;
+ private final ThrottledLogger _throttledLogger;
private long _numRecordsFiltered;
@@ -52,6 +54,7 @@ public class FilterTransformer implements RecordTransformer {
}
_evaluator = _filterFunction != null ?
FunctionEvaluatorFactory.getExpressionEvaluator(_filterFunction) : null;
_continueOnError = ingestionConfig != null &&
ingestionConfig.isContinueOnError();
+ _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
}
@Override
@@ -79,11 +82,10 @@ public class FilterTransformer implements RecordTransformer
{
} catch (Exception e) {
if (!_continueOnError) {
throw new RuntimeException(
- String.format("Caught exception while executing filter function:
%s for record: %s", _filterFunction,
- record.toString()), e);
+ String.format("Caught exception while executing filter function:
%s", _filterFunction), e);
} else {
- LOGGER.debug("Caught exception while executing filter function: {}
for record: {}", _filterFunction,
- record.toString(), e);
+ _throttledLogger.warn(
+ String.format("Caught exception while executing filter function:
%s", _filterFunction), e);
record.markIncomplete();
filteredRecords.add(record);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
index 59d9a044e09..6663bf4116a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.segment.local.utils.Base64Utils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
@@ -114,6 +115,7 @@ public class SchemaConformingTransformer implements
RecordTransformer {
private final int _jsonKeyValueSeparatorByteCount;
private final boolean _continueOnError;
private final ServerMetrics _serverMetrics;
+ private final ThrottledLogger _throttledLogger;
private final GenericRow _reusedOutputRecord = new GenericRow();
private final Map<String, Object> _reusedMergedTextIndexMap = new
HashMap<>();
@@ -143,6 +145,7 @@ public class SchemaConformingTransformer implements
RecordTransformer {
_transformerConfig.getJsonKeyValueSeparator().getBytes(StandardCharsets.UTF_8).length;
_continueOnError = ingestionConfig.isContinueOnError();
_serverMetrics = ServerMetrics.get();
+ _throttledLogger = new ThrottledLogger(_logger, ingestionConfig);
}
/// Returns a [ComplexTypeTransformer] if it is defined in the table config,
`null` otherwise.
@@ -377,7 +380,7 @@ public class SchemaConformingTransformer implements
RecordTransformer {
if (!_continueOnError) {
throw e;
}
- _logger.debug("Couldn't transform record: {}", record.toString(), e);
+ _throttledLogger.warn("Couldn't transform record", e);
outputRecord.markIncomplete();
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java
index 613978439d3..a1c5fa71a3b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.recordtransformer;
import com.google.common.base.Preconditions;
+import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -46,6 +47,7 @@ public class TimeValidationTransformer implements
RecordTransformer {
private final DateTimeFormatSpec _timeFormatSpec;
private final boolean _enableTimeValueCheck;
private final boolean _continueOnError;
+ private final ThrottledLogger _throttledLogger;
public TimeValidationTransformer(TableConfig tableConfig, Schema schema) {
_timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
@@ -61,6 +63,7 @@ public class TimeValidationTransformer implements
RecordTransformer {
_timeFormatSpec = null;
_continueOnError = false;
}
+ _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
}
@Override
@@ -83,7 +86,7 @@ public class TimeValidationTransformer implements
RecordTransformer {
if (!_continueOnError) {
throw new IllegalStateException(errorMessage, e);
}
- LOGGER.debug(errorMessage, e);
+ _throttledLogger.warn(errorMessage, e);
record.putValue(_timeColumnName, null);
record.markIncomplete();
return;
@@ -95,7 +98,7 @@ public class TimeValidationTransformer implements
RecordTransformer {
if (!_continueOnError) {
throw new IllegalStateException(errorMessage);
}
- LOGGER.debug(errorMessage);
+ _throttledLogger.warn(errorMessage, new
IllegalStateException(errorMessage));
record.putValue(_timeColumnName, null);
record.markIncomplete();
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index 2c3fc664b87..e44d0b89246 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -23,6 +23,7 @@ import
com.fasterxml.jackson.annotation.JsonPropertyDescription;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
/**
@@ -76,6 +77,10 @@ public class IngestionConfig extends BaseJsonConfig {
@JsonPropertyDescription("Configs related to check time value for segment")
private boolean _segmentTimeValueCheck = true;
+ @JsonPropertyDescription("Max exception logs per minute per exception class
(0 to disable)")
+ private int _ingestionExceptionLogRateLimitPerMin =
+
CommonConstants.IngestionConfigs.DEFAULT_INGESTION_EXCEPTION_LOG_RATE_LIMIT_PER_MIN;
+
@Deprecated
public IngestionConfig(@Nullable BatchIngestionConfig batchIngestionConfig,
@Nullable StreamIngestionConfig streamIngestionConfig, @Nullable
FilterConfig filterConfig,
@@ -152,6 +157,10 @@ public class IngestionConfig extends BaseJsonConfig {
return _segmentTimeValueCheck;
}
+ public int getIngestionExceptionLogRateLimitPerMin() {
+ return _ingestionExceptionLogRateLimitPerMin;
+ }
+
public void setBatchIngestionConfig(BatchIngestionConfig
batchIngestionConfig) {
_batchIngestionConfig = batchIngestionConfig;
}
@@ -200,4 +209,8 @@ public class IngestionConfig extends BaseJsonConfig {
public void setSegmentTimeValueCheck(boolean segmentTimeValueCheck) {
_segmentTimeValueCheck = segmentTimeValueCheck;
}
+
+ public void setIngestionExceptionLogRateLimitPerMin(int
ingestionExceptionLogRateLimitPerMin) {
+ _ingestionExceptionLogRateLimitPerMin =
ingestionExceptionLogRateLimitPerMin;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index db3adb17963..f9bde07224d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -2091,6 +2091,10 @@ public class CommonConstants {
"pinot.field.spec.default.json.max.length";
}
+ public static class IngestionConfigs {
+ public static final int DEFAULT_INGESTION_EXCEPTION_LOG_RATE_LIMIT_PER_MIN
= 5;
+ }
+
/**
* Configuration for setting up groovy static analyzer.
* User can config different configuration for query and ingestion (table
creation and update) static analyzer.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]