This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d78b39b6ca8 [ingestion] Add rate-limited exception logging for 
transformers (#17239)
d78b39b6ca8 is described below

commit d78b39b6ca850d9aa2d50f4c47a9a92a1c8a4745
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Tue Nov 25 13:37:39 2025 -0800

    [ingestion] Add rate-limited exception logging for transformers (#17239)
---
 .../org/apache/pinot/common/utils/RateLimiter.java |  93 +++++++++++++
 .../apache/pinot/common/utils/ThrottledLogger.java |  97 +++++++++++++
 .../apache/pinot/common/utils/RateLimiterTest.java | 148 ++++++++++++++++++++
 .../pinot/common/utils/ThrottledLoggerTest.java    | 150 +++++++++++++++++++++
 .../recordtransformer/ComplexTypeTransformer.java  |   8 +-
 .../recordtransformer/DataTypeTransformer.java     |   5 +-
 .../recordtransformer/ExpressionTransformer.java   |   5 +-
 .../local/recordtransformer/FilterTransformer.java |  10 +-
 .../SchemaConformingTransformer.java               |   5 +-
 .../TimeValidationTransformer.java                 |   7 +-
 .../config/table/ingestion/IngestionConfig.java    |  13 ++
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +
 12 files changed, 534 insertions(+), 11 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/RateLimiter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/RateLimiter.java
new file mode 100644
index 00000000000..f43ddcf7033
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/RateLimiter.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.function.LongSupplier;
+import javax.annotation.concurrent.NotThreadSafe;
+
+
+/**
+ * A highly performant, non-thread-safe Token Bucket implementation.
+ * It tracks its own dropped count to avoid wrapper object allocations.
+ */
+@NotThreadSafe
+class RateLimiter {
+  private final double _capacity;
+  private final double _tokensPerNano;
+  private final LongSupplier _nanoTimeSupplier;
+
+  private double _tokens;
+  private long _lastRefillNano;
+  private long _droppedCount;
+
+  RateLimiter(double rateLimitPerMin) {
+    this(rateLimitPerMin, System::nanoTime);
+  }
+
+  @VisibleForTesting
+  RateLimiter(double rateLimitPerMin, LongSupplier nanoTimeSupplier) {
+    _capacity = rateLimitPerMin;
+    _tokens = rateLimitPerMin;
+    _tokensPerNano = rateLimitPerMin / 60_000_000_000.0;
+    _nanoTimeSupplier = nanoTimeSupplier;
+    _lastRefillNano = nanoTimeSupplier.getAsLong();
+    _droppedCount = 0;
+  }
+
+  /**
+   * Attempts to consume a token.
+   * Refills tokens based on elapsed time.
+   * @return true if token acquired, false if limit exceeded (and increments 
dropped count)
+   */
+  boolean tryAcquire() {
+    long now = _nanoTimeSupplier.getAsLong();
+
+    // 1. Refill
+    long delta = now - _lastRefillNano;
+    if (delta > 0) {
+      double newTokens = delta * _tokensPerNano;
+      _tokens = Math.min(_capacity, _tokens + newTokens);
+      _lastRefillNano = now;
+    }
+
+    // 2. Acquire
+    if (_tokens >= 1.0) {
+      _tokens -= 1.0;
+      return true;
+    }
+
+    // 3. Reject & Track
+    _droppedCount++;
+    return false;
+  }
+
+  /**
+   * Returns the number of logs dropped since the last successful acquire,
+   * and resets the counter to zero.
+   */
+  long getAndResetDroppedCount() {
+    if (_droppedCount == 0) {
+      return 0;
+    }
+    long count = _droppedCount;
+    _droppedCount = 0;
+    return count;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ThrottledLogger.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ThrottledLogger.java
new file mode 100644
index 00000000000..7c462084afd
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ThrottledLogger.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.slf4j.Logger;
+
+
+/**
+ * Rate-limited exception logger that prevents log flooding while maintaining 
visibility into errors.
+ *
+ * <p><b>Key Features:</b>
+ * <ul>
+ * <li>Class-based fingerprinting: Each exception class gets its own rate 
limiter</li>
+ * <li>Suppression tracking: Reports count of dropped logs when rate limit is 
lifted</li>
+ * <li>Zero Dependencies: Uses a lightweight in-house Token Bucket 
implementation</li>
+ * </ul>
+ *
+ * <p><b>Note:</b> This class is designed for single-threaded access per 
instance.
+ */
+@NotThreadSafe
+public class ThrottledLogger {
+  private final Logger _delegate;
+  private final Map<Class<?>, RateLimiter> _rateLimiterMap = new HashMap<>();
+  private final double _rateLimitPerMin;
+
+  public ThrottledLogger(Logger delegate, @Nullable IngestionConfig 
ingestionConfig) {
+    this(delegate, getRateLimitPerMin(ingestionConfig));
+  }
+
+  public ThrottledLogger(Logger delegate, double rateLimitPerMin) {
+    _delegate = delegate;
+    _rateLimitPerMin = rateLimitPerMin;
+  }
+
+  private static double getRateLimitPerMin(@Nullable IngestionConfig 
ingestionConfig) {
+    if (ingestionConfig == null) {
+      return 0;
+    }
+    return ingestionConfig.getIngestionExceptionLogRateLimitPerMin();
+  }
+
+  public void warn(String msg, Throwable t) {
+    logWithRateLimit(msg, t, _delegate::warn);
+  }
+
+  public void error(String msg, Throwable t) {
+    logWithRateLimit(msg, t, _delegate::error);
+  }
+
+  private void logWithRateLimit(String msg, Throwable t, BiConsumer<String, 
Throwable> consumer) {
+    if (_rateLimitPerMin <= 0) {
+      _delegate.debug(msg, t);
+      return;
+    }
+
+    Class<?> exceptionClass = t.getClass();
+    RateLimiter limiter = _rateLimiterMap.computeIfAbsent(exceptionClass, k -> 
new RateLimiter(_rateLimitPerMin));
+
+    // tryAcquire now automatically handles incrementing the drop count if it 
returns false
+    if (limiter.tryAcquire()) {
+      // If we successfully acquired, check if we had previously dropped logs
+      long droppedCount = limiter.getAndResetDroppedCount();
+      if (droppedCount > 0) {
+        consumer.accept(String.format("Dropped %d occurrences of %s",
+            droppedCount,
+            exceptionClass.getSimpleName()), null);
+      }
+      consumer.accept(msg, t);
+    } else {
+      // If acquire failed, the limiter has already incremented its internal 
dropped counter.
+      // We still debug log so the data isn't completely lost.
+      _delegate.debug(msg, t);
+    }
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/RateLimiterTest.java 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/RateLimiterTest.java
new file mode 100644
index 00000000000..e2d99d39d5c
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/RateLimiterTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.function.LongSupplier;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class RateLimiterTest {
+
+  /**
+   * Helper class to control time in tests for precise time-based assertions.
+   */
+  private static class ControllableTimeSource implements LongSupplier {
+    private long _currentNanos;
+
+    ControllableTimeSource(long initialNanos) {
+      _currentNanos = initialNanos;
+    }
+
+    @Override
+    public long getAsLong() {
+      return _currentNanos;
+    }
+
+    public void advanceSeconds(long seconds) {
+      _currentNanos += seconds * 1_000_000_000L;
+    }
+  }
+
+  @Test
+  public void testRateLimiterInitialCapacity() {
+    ControllableTimeSource timeSource = new ControllableTimeSource(0);
+    RateLimiter limiter = new RateLimiter(5.0, timeSource);
+
+    boolean[] results = new boolean[6];
+    for (int i = 0; i < 6; i++) {
+      results[i] = limiter.tryAcquire();
+    }
+
+    assertThat(results[0]).describedAs("First acquire should 
succeed").isTrue();
+    assertThat(results[1]).describedAs("Second acquire should 
succeed").isTrue();
+    assertThat(results[2]).describedAs("Third acquire should 
succeed").isTrue();
+    assertThat(results[3]).describedAs("Fourth acquire should 
succeed").isTrue();
+    assertThat(results[4]).describedAs("Fifth acquire should 
succeed").isTrue();
+    assertThat(results[5]).describedAs("Sixth acquire should fail").isFalse();
+  }
+
+  @Test
+  public void testRateLimiterRejectIncrementsDropCount() {
+    ControllableTimeSource timeSource = new ControllableTimeSource(0);
+    RateLimiter limiter = new RateLimiter(2.0, timeSource);
+
+    limiter.tryAcquire();
+    limiter.tryAcquire();
+
+    for (int i = 0; i < 10; i++) {
+      boolean result = limiter.tryAcquire();
+      assertThat(result).describedAs("Acquire should fail when tokens 
exhausted").isFalse();
+    }
+
+    long droppedCount = limiter.getAndResetDroppedCount();
+    assertThat(droppedCount).isEqualTo(10);
+  }
+
+  @Test
+  public void testRateLimiterDropCountResets() {
+    ControllableTimeSource timeSource = new ControllableTimeSource(0);
+    RateLimiter limiter = new RateLimiter(5.0, timeSource);
+
+    for (int i = 0; i < 5; i++) {
+      limiter.tryAcquire();
+    }
+
+    for (int i = 0; i < 100; i++) {
+      assertThat(limiter.tryAcquire()).isFalse();
+    }
+
+    long droppedBeforeRefill = limiter.getAndResetDroppedCount();
+    assertThat(droppedBeforeRefill).isEqualTo(100);
+
+    long secondCall = limiter.getAndResetDroppedCount();
+    assertThat(secondCall).isEqualTo(0);
+
+    timeSource.advanceSeconds(60);
+    for (int i = 0; i < 5; i++) {
+      assertThat(limiter.tryAcquire()).isTrue();
+    }
+    assertThat(limiter.tryAcquire()).isFalse();
+
+    long droppedAfterRefill = limiter.getAndResetDroppedCount();
+    assertThat(droppedAfterRefill).isEqualTo(1);
+  }
+
+  @Test
+  public void testRateLimiterRefillsTokensOverTime() {
+    ControllableTimeSource timeSource = new ControllableTimeSource(0);
+    RateLimiter limiter = new RateLimiter(10.0, timeSource);
+
+    for (int i = 0; i < 10; i++) {
+      assertThat(limiter.tryAcquire()).isTrue();
+    }
+    assertThat(limiter.tryAcquire()).isFalse();
+
+    timeSource.advanceSeconds(6);
+    assertThat(limiter.tryAcquire()).isTrue();
+
+    timeSource.advanceSeconds(120);
+    for (int i = 0; i < 10; i++) {
+      assertThat(limiter.tryAcquire()).isTrue();
+    }
+    assertThat(limiter.tryAcquire()).isFalse();
+  }
+
+  @Test
+  public void testRateLimiterZeroTimeDelta() {
+    ControllableTimeSource timeSource = new ControllableTimeSource(0);
+    RateLimiter limiter = new RateLimiter(10.0, timeSource);
+
+    int successCount = 0;
+    for (int i = 0; i < 20; i++) {
+      if (limiter.tryAcquire()) {
+        successCount++;
+      }
+    }
+
+    assertThat(successCount).isEqualTo(10);
+    assertThat(limiter.getAndResetDroppedCount()).isEqualTo(10);
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/ThrottledLoggerTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/ThrottledLoggerTest.java
new file mode 100644
index 00000000000..1e3ad3c2880
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/ThrottledLoggerTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.io.IOException;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+
+
+public class ThrottledLoggerTest {
+
+  @Test
+  public void testIndependentRateLimitingPerExceptionClass() {
+    Logger mockLogger = mock(Logger.class);
+    ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, 2.0 / 
60.0);
+
+    for (int i = 0; i < 100; i++) {
+      throttledLogger.warn("Error", new NumberFormatException("Invalid 
number"));
+    }
+    for (int i = 0; i < 100; i++) {
+      throttledLogger.warn("Error", new IllegalArgumentException("Invalid 
argument"));
+    }
+    for (int i = 0; i < 100; i++) {
+      throttledLogger.error("Error", new IOException("IO error"));
+    }
+
+    verify(mockLogger, atMost(3)).warn(eq("Error"), 
any(NumberFormatException.class));
+    verify(mockLogger, atMost(3)).warn(eq("Error"), 
any(IllegalArgumentException.class));
+    verify(mockLogger, atMost(3)).error(eq("Error"), any(IOException.class));
+  }
+
+  @Test
+  public void testDefaultIngestionConfigDisablesThrottling() {
+    Logger mockLogger = mock(Logger.class);
+    ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, null);
+
+    for (int i = 0; i < 100; i++) {
+      throttledLogger.warn("Error", new NumberFormatException("Invalid 
number"));
+    }
+
+    verify(mockLogger, never()).warn(anyString(), any(Throwable.class));
+    verify(mockLogger, times(100)).debug(eq("Error"), 
any(NumberFormatException.class));
+  }
+
+  @Test
+  public void testExplicitZeroRateLimitDisablesThrottling() {
+    Logger mockLogger = mock(Logger.class);
+    IngestionConfig config = new IngestionConfig();
+    config.setIngestionExceptionLogRateLimitPerMin(0);
+    ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, config);
+
+    for (int i = 0; i < 100; i++) {
+      throttledLogger.warn("Error", new NumberFormatException("Invalid 
number"));
+    }
+
+    verify(mockLogger, never()).warn(anyString(), any(Throwable.class));
+    verify(mockLogger, times(100)).debug(eq("Error"), 
any(NumberFormatException.class));
+  }
+
+  @Test
+  public void testDroppedCountReportedWithCorrectMessage() {
+    Logger mockLogger = mock(Logger.class);
+    ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, 1.0);
+
+    NumberFormatException exception = new NumberFormatException("Test");
+    for (int i = 0; i < 10; i++) {
+      throttledLogger.warn("Error occurred", exception);
+    }
+
+    verify(mockLogger, times(1)).warn(eq("Error occurred"), eq(exception));
+    verify(mockLogger, times(9)).debug(eq("Error occurred"), eq(exception));
+
+    IllegalArgumentException differentException = new 
IllegalArgumentException("Different");
+    throttledLogger.warn("Different error", differentException);
+
+    verify(mockLogger, times(1)).warn(eq("Different error"), 
eq(differentException));
+  }
+
+  @Test
+  public void testDroppedCountIndependentPerExceptionClass() {
+    Logger mockLogger = mock(Logger.class);
+    ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, 2.0);
+
+    NumberFormatException exceptionA = new NumberFormatException("Test A");
+    for (int i = 0; i < 10; i++) {
+      throttledLogger.warn("Error A", exceptionA);
+    }
+
+    IllegalArgumentException exceptionB = new IllegalArgumentException("Test 
B");
+    for (int i = 0; i < 5; i++) {
+      throttledLogger.error("Error B", exceptionB);
+    }
+
+    verify(mockLogger, times(2)).warn(eq("Error A"), eq(exceptionA));
+    verify(mockLogger, times(8)).debug(eq("Error A"), eq(exceptionA));
+
+    verify(mockLogger, times(2)).error(eq("Error B"), eq(exceptionB));
+    verify(mockLogger, times(3)).debug(eq("Error B"), eq(exceptionB));
+
+    verify(mockLogger, times(2)).warn(anyString(), 
any(NumberFormatException.class));
+    verify(mockLogger, times(2)).error(anyString(), 
any(IllegalArgumentException.class));
+  }
+
+  @Test
+  public void testNoSuppressionMessageWhenZeroDrops() {
+    Logger mockLogger = mock(Logger.class);
+    ThrottledLogger throttledLogger = new ThrottledLogger(mockLogger, 10.0);
+
+    NumberFormatException exception = new NumberFormatException("Test");
+    throttledLogger.warn("First log", exception);
+
+    verify(mockLogger, times(1)).warn(eq("First log"), eq(exception));
+    verify(mockLogger, never()).debug(anyString(), any(Throwable.class));
+
+    throttledLogger.warn("Second log", exception);
+
+    verify(mockLogger, times(1)).warn(eq("Second log"), eq(exception));
+
+    ArgumentCaptor<String> messageCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(mockLogger, atLeast(0)).warn(messageCaptor.capture(), 
any(Throwable.class));
+
+    for (String message : messageCaptor.getAllValues()) {
+      assertThat(message).doesNotContain("Dropped");
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
index 0c4d7cfba88..8d251c0d084 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
@@ -29,6 +29,7 @@ import java.util.Objects;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.function.scalar.JsonFunctions;
+import org.apache.pinot.common.utils.ThrottledLogger;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import 
org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig.CollectionNotUnnestedToJson;
@@ -98,6 +99,7 @@ public class ComplexTypeTransformer implements 
RecordTransformer {
   private final CollectionNotUnnestedToJson _collectionNotUnnestedToJson;
   private final Map<String, String> _prefixesToRename;
   private final boolean _continueOnError;
+  private final ThrottledLogger _throttledLogger;
   private final List<String> _fieldsToUnnestAndKeepOriginalValue;
 
   private ComplexTypeTransformer(TableConfig tableConfig) {
@@ -122,6 +124,7 @@ public class ComplexTypeTransformer implements 
RecordTransformer {
         
Objects.requireNonNullElse(complexTypeConfig.getCollectionNotUnnestedToJson(), 
DEFAULT_COLLECTION_TO_JSON_MODE);
     _prefixesToRename = 
Objects.requireNonNullElse(complexTypeConfig.getPrefixesToRename(), Map.of());
     _continueOnError = ingestionConfig.isContinueOnError();
+    _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
   }
 
   /// Returns a [ComplexTypeTransformer] if it is defined in the table config, 
`null` otherwise.
@@ -143,6 +146,7 @@ public class ComplexTypeTransformer implements 
RecordTransformer {
     _collectionNotUnnestedToJson = collectionNotUnnestedToJson;
     _prefixesToRename = prefixesToRename;
     _continueOnError = continueOnError;
+    _throttledLogger = new ThrottledLogger(LOGGER, 0.0);
   }
 
   @VisibleForTesting
@@ -233,7 +237,7 @@ public class ComplexTypeTransformer implements 
RecordTransformer {
         if (!_continueOnError) {
           throw new RuntimeException("Caught exception while transforming 
complex types", e);
         }
-        LOGGER.debug("Caught exception while transforming complex types for 
record: {}", record.toString(), e);
+        _throttledLogger.warn("Caught exception while transforming complex 
types", e);
         record.markIncomplete();
       }
     }
@@ -245,7 +249,7 @@ public class ComplexTypeTransformer implements 
RecordTransformer {
           if (!_continueOnError) {
             throw new RuntimeException("Caught exception while renaming 
prefixes", e);
           }
-          LOGGER.debug("Caught exception while renaming prefixes for record: 
{}", record.toString(), e);
+          _throttledLogger.warn("Caught exception while renaming prefixes", e);
           record.markIncomplete();
         }
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
index 1a69069945e..1c91136602e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.common.utils.ThrottledLogger;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -50,6 +51,7 @@ public class DataTypeTransformer implements RecordTransformer 
{
 
   private final Map<String, PinotDataType> _dataTypes = new HashMap<>();
   private final boolean _continueOnError;
+  private final ThrottledLogger _throttledLogger;
 
   public DataTypeTransformer(TableConfig tableConfig, Schema schema) {
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
@@ -59,6 +61,7 @@ public class DataTypeTransformer implements RecordTransformer 
{
     }
     IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
     _continueOnError = ingestionConfig != null && 
ingestionConfig.isContinueOnError();
+    _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
   }
 
   @Override
@@ -121,7 +124,7 @@ public class DataTypeTransformer implements 
RecordTransformer {
         if (!_continueOnError) {
           throw new RuntimeException("Caught exception while transforming data 
type for column: " + column, e);
         }
-        LOGGER.debug("Caught exception while transforming data type for 
column: {}", column, e);
+        _throttledLogger.warn("Caught exception while transforming data type 
for column: " + column, e);
         record.putValue(column, null);
         record.markIncomplete();
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
index c9bfe192f77..7c62372f409 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.pinot.common.utils.ThrottledLogger;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
 import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -52,6 +53,7 @@ public class ExpressionTransformer implements 
RecordTransformer {
   @VisibleForTesting
   final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new 
LinkedHashMap<>();
   private final boolean _continueOnError;
+  private final ThrottledLogger _throttledLogger;
 
   public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
     Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
@@ -88,6 +90,7 @@ public class ExpressionTransformer implements 
RecordTransformer {
     }
 
     _continueOnError = ingestionConfig != null && 
ingestionConfig.isContinueOnError();
+    _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
   }
 
   private void topologicalSort(String column, Map<String, FunctionEvaluator> 
expressionEvaluators,
@@ -148,7 +151,7 @@ public class ExpressionTransformer implements 
RecordTransformer {
           if (!_continueOnError) {
             throw new RuntimeException("Caught exception while evaluation 
transform function for column: " + column, e);
           }
-          LOGGER.debug("Caught exception while evaluation transform function 
for column: {}", column, e);
+          _throttledLogger.warn("Caught exception while evaluation transform 
function for column: " + column, e);
           record.markIncomplete();
         }
       } else if (existingValue.getClass().isArray() || existingValue 
instanceof Collections
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java
index 79915c2b894..ebf1cd72ca0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/FilterTransformer.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.recordtransformer;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.pinot.common.utils.ThrottledLogger;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
 import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -40,6 +41,7 @@ public class FilterTransformer implements RecordTransformer {
   private final String _filterFunction;
   private final FunctionEvaluator _evaluator;
   private final boolean _continueOnError;
+  private final ThrottledLogger _throttledLogger;
 
   private long _numRecordsFiltered;
 
@@ -52,6 +54,7 @@ public class FilterTransformer implements RecordTransformer {
     }
     _evaluator = _filterFunction != null ? 
FunctionEvaluatorFactory.getExpressionEvaluator(_filterFunction) : null;
     _continueOnError = ingestionConfig != null && 
ingestionConfig.isContinueOnError();
+    _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
   }
 
   @Override
@@ -79,11 +82,10 @@ public class FilterTransformer implements RecordTransformer 
{
       } catch (Exception e) {
         if (!_continueOnError) {
           throw new RuntimeException(
-              String.format("Caught exception while executing filter function: 
%s for record: %s", _filterFunction,
-                  record.toString()), e);
+              String.format("Caught exception while executing filter function: 
%s", _filterFunction), e);
         } else {
-          LOGGER.debug("Caught exception while executing filter function: {} 
for record: {}", _filterFunction,
-              record.toString(), e);
+          _throttledLogger.warn(
+              String.format("Caught exception while executing filter function: 
%s", _filterFunction), e);
           record.markIncomplete();
           filteredRecords.add(record);
         }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
index 59d9a044e09..6663bf4116a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.ThrottledLogger;
 import org.apache.pinot.segment.local.utils.Base64Utils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
@@ -114,6 +115,7 @@ public class SchemaConformingTransformer implements 
RecordTransformer {
   private final int _jsonKeyValueSeparatorByteCount;
   private final boolean _continueOnError;
   private final ServerMetrics _serverMetrics;
+  private final ThrottledLogger _throttledLogger;
 
   private final GenericRow _reusedOutputRecord = new GenericRow();
   private final Map<String, Object> _reusedMergedTextIndexMap = new 
HashMap<>();
@@ -143,6 +145,7 @@ public class SchemaConformingTransformer implements 
RecordTransformer {
         
_transformerConfig.getJsonKeyValueSeparator().getBytes(StandardCharsets.UTF_8).length;
     _continueOnError = ingestionConfig.isContinueOnError();
     _serverMetrics = ServerMetrics.get();
+    _throttledLogger = new ThrottledLogger(_logger, ingestionConfig);
   }
 
   /// Returns a [ComplexTypeTransformer] if it is defined in the table config, 
`null` otherwise.
@@ -377,7 +380,7 @@ public class SchemaConformingTransformer implements 
RecordTransformer {
       if (!_continueOnError) {
         throw e;
       }
-      _logger.debug("Couldn't transform record: {}", record.toString(), e);
+      _throttledLogger.warn("Couldn't transform record", e);
       outputRecord.markIncomplete();
     }
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java
index 613978439d3..a1c5fa71a3b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/TimeValidationTransformer.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.recordtransformer;
 
 import com.google.common.base.Preconditions;
+import org.apache.pinot.common.utils.ThrottledLogger;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -46,6 +47,7 @@ public class TimeValidationTransformer implements 
RecordTransformer {
   private final DateTimeFormatSpec _timeFormatSpec;
   private final boolean _enableTimeValueCheck;
   private final boolean _continueOnError;
+  private final ThrottledLogger _throttledLogger;
 
   public TimeValidationTransformer(TableConfig tableConfig, Schema schema) {
     _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
@@ -61,6 +63,7 @@ public class TimeValidationTransformer implements 
RecordTransformer {
       _timeFormatSpec = null;
       _continueOnError = false;
     }
+    _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
   }
 
   @Override
@@ -83,7 +86,7 @@ public class TimeValidationTransformer implements 
RecordTransformer {
       if (!_continueOnError) {
         throw new IllegalStateException(errorMessage, e);
       }
-      LOGGER.debug(errorMessage, e);
+      _throttledLogger.warn(errorMessage, e);
       record.putValue(_timeColumnName, null);
       record.markIncomplete();
       return;
@@ -95,7 +98,7 @@ public class TimeValidationTransformer implements 
RecordTransformer {
       if (!_continueOnError) {
         throw new IllegalStateException(errorMessage);
       }
-      LOGGER.debug(errorMessage);
+      _throttledLogger.warn(errorMessage, new 
IllegalStateException(errorMessage));
       record.putValue(_timeColumnName, null);
       record.markIncomplete();
     }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index 2c3fc664b87..e44d0b89246 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -23,6 +23,7 @@ import 
com.fasterxml.jackson.annotation.JsonPropertyDescription;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
 
 
 /**
@@ -76,6 +77,10 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to check time value for segment")
   private boolean _segmentTimeValueCheck = true;
 
+  @JsonPropertyDescription("Max exception logs per minute per exception class 
(0 to disable)")
+  private int _ingestionExceptionLogRateLimitPerMin =
+      
CommonConstants.IngestionConfigs.DEFAULT_INGESTION_EXCEPTION_LOG_RATE_LIMIT_PER_MIN;
+
   @Deprecated
   public IngestionConfig(@Nullable BatchIngestionConfig batchIngestionConfig,
       @Nullable StreamIngestionConfig streamIngestionConfig, @Nullable 
FilterConfig filterConfig,
@@ -152,6 +157,10 @@ public class IngestionConfig extends BaseJsonConfig {
     return _segmentTimeValueCheck;
   }
 
+  public int getIngestionExceptionLogRateLimitPerMin() {
+    return _ingestionExceptionLogRateLimitPerMin;
+  }
+
   public void setBatchIngestionConfig(BatchIngestionConfig 
batchIngestionConfig) {
     _batchIngestionConfig = batchIngestionConfig;
   }
@@ -200,4 +209,8 @@ public class IngestionConfig extends BaseJsonConfig {
   public void setSegmentTimeValueCheck(boolean segmentTimeValueCheck) {
     _segmentTimeValueCheck = segmentTimeValueCheck;
   }
+
+  public void setIngestionExceptionLogRateLimitPerMin(int 
ingestionExceptionLogRateLimitPerMin) {
+    _ingestionExceptionLogRateLimitPerMin = 
ingestionExceptionLogRateLimitPerMin;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index db3adb17963..f9bde07224d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -2091,6 +2091,10 @@ public class CommonConstants {
         "pinot.field.spec.default.json.max.length";
   }
 
+  public static class IngestionConfigs {
+    public static final int DEFAULT_INGESTION_EXCEPTION_LOG_RATE_LIMIT_PER_MIN 
= 5;
+  }
+
   /**
    * Configuration for setting up groovy static analyzer.
    * User can config different configuration for query and ingestion (table 
creation and update) static analyzer.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to