This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new be6dd7e092 add instrumentation to json index 
getMatchingFlattenedDocsMap() (#13164)
be6dd7e092 is described below

commit be6dd7e092435444c2d0366c12478653705b47e3
Author: Christopher Peck <[email protected]>
AuthorDate: Thu May 23 15:08:10 2024 -0700

    add instrumentation to json index getMatchingFlattenedDocsMap() (#13164)
---
 .../JsonExtractIndexTransformFunction.java         |  56 ++++++-----
 .../accounting/ResourceManagerAccountingTest.java  | 110 +++++++++++++++++++++
 .../realtime/impl/json/MutableJsonIndexImpl.java   |   2 +
 .../readers/json/ImmutableJsonIndexReader.java     |   2 +
 4 files changed, 147 insertions(+), 23 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
index b499b7384c..ffc835e7cb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
@@ -48,6 +48,8 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
   private JsonIndexReader _jsonIndexReader;
   private Object _defaultValue;
   private Map<String, RoaringBitmap> _valueToMatchingDocsMap;
+  private boolean _isSingleValue;
+  private String _filterJsonPath;
 
   @Override
   public String getName() {
@@ -91,12 +93,12 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
       throw new IllegalArgumentException("Result type argument must be a 
literal");
     }
     String resultsType = ((LiteralTransformFunction) 
thirdArgument).getStringLiteral().toUpperCase();
-    boolean isSingleValue = !resultsType.endsWith("_ARRAY");
-    if (isSingleValue && _jsonPathString.contains("[*]")) {
+    _isSingleValue = !resultsType.endsWith("_ARRAY");
+    if (_isSingleValue && _jsonPathString.contains("[*]")) {
       throw new IllegalArgumentException(
           "[*] syntax in json path is unsupported for singleValue field 
json_extract_index");
     }
-    DataType dataType = isSingleValue ? DataType.valueOf(resultsType)
+    DataType dataType = _isSingleValue ? DataType.valueOf(resultsType)
         : DataType.valueOf(resultsType.substring(0, resultsType.length() - 6));
 
     if (arguments.size() >= 4) {
@@ -105,7 +107,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
         throw new IllegalArgumentException("Default value must be a literal");
       }
 
-      if (isSingleValue) {
+      if (_isSingleValue) {
         _defaultValue = dataType.convert(((LiteralTransformFunction) 
fourthArgument).getStringLiteral());
       } else {
         try {
@@ -124,21 +126,15 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
       }
     }
 
-    String filterJsonPath = null;
     if (arguments.size() == 5) {
       TransformFunction fifthArgument = arguments.get(4);
       if (!(fifthArgument instanceof LiteralTransformFunction)) {
         throw new IllegalArgumentException("JSON path filter argument must be 
a literal");
       }
-      filterJsonPath = ((LiteralTransformFunction) 
fifthArgument).getStringLiteral();
+      _filterJsonPath = ((LiteralTransformFunction) 
fifthArgument).getStringLiteral();
     }
 
-    _resultMetadata = new TransformResultMetadata(dataType, isSingleValue, 
false);
-    _valueToMatchingDocsMap = 
_jsonIndexReader.getMatchingFlattenedDocsMap(_jsonPathString, filterJsonPath);
-    if (isSingleValue) {
-      // For single value result type, it's more efficient to use original 
docIDs map
-      _jsonIndexReader.convertFlattenedDocIdsToDocIds(_valueToMatchingDocsMap);
-    }
+    _resultMetadata = new TransformResultMetadata(dataType, _isSingleValue, 
false);
   }
 
   @Override
@@ -152,7 +148,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int[] inputDocIds = valueBlock.getDocIds();
     initIntValuesSV(numDocs);
     String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
-        _valueToMatchingDocsMap, false);
+        getValueToMatchingDocsMap(), false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[inputDocIds[i]];
       if (value == null) {
@@ -174,7 +170,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int[] inputDocIds = valueBlock.getDocIds();
     initLongValuesSV(numDocs);
     String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
-        _valueToMatchingDocsMap, false);
+        getValueToMatchingDocsMap(), false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[i];
       if (value == null) {
@@ -196,7 +192,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int[] inputDocIds = valueBlock.getDocIds();
     initFloatValuesSV(numDocs);
     String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
-        _valueToMatchingDocsMap, false);
+        getValueToMatchingDocsMap(), false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[i];
       if (value == null) {
@@ -218,7 +214,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int[] inputDocIds = valueBlock.getDocIds();
     initDoubleValuesSV(numDocs);
     String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
-        _valueToMatchingDocsMap, false);
+        getValueToMatchingDocsMap(), false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[i];
       if (value == null) {
@@ -240,7 +236,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int[] inputDocIds = valueBlock.getDocIds();
     initBigDecimalValuesSV(numDocs);
     String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
-        _valueToMatchingDocsMap, false);
+        getValueToMatchingDocsMap(), false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[i];
       if (value == null) {
@@ -262,7 +258,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int[] inputDocIds = valueBlock.getDocIds();
     initStringValuesSV(numDocs);
     String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
-        _valueToMatchingDocsMap, false);
+        getValueToMatchingDocsMap(), false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[i];
       if (value == null) {
@@ -283,7 +279,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int numDocs = valueBlock.getNumDocs();
     initIntValuesMV(numDocs);
     String[][] valuesFromIndex = 
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
-        _valueToMatchingDocsMap);
+        getValueToMatchingDocsMap());
 
     for (int i = 0; i < numDocs; i++) {
       String[] value = valuesFromIndex[i];
@@ -311,7 +307,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int numDocs = valueBlock.getNumDocs();
     initLongValuesMV(numDocs);
     String[][] valuesFromIndex = 
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
-        _valueToMatchingDocsMap);
+        getValueToMatchingDocsMap());
     for (int i = 0; i < numDocs; i++) {
       String[] value = valuesFromIndex[i];
       if (value.length == 0) {
@@ -338,7 +334,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int numDocs = valueBlock.getNumDocs();
     initFloatValuesMV(numDocs);
     String[][] valuesFromIndex = 
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
-        _valueToMatchingDocsMap);
+        getValueToMatchingDocsMap());
     for (int i = 0; i < numDocs; i++) {
       String[] value = valuesFromIndex[i];
       if (value.length == 0) {
@@ -365,7 +361,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int numDocs = valueBlock.getNumDocs();
     initDoubleValuesMV(numDocs);
     String[][] valuesFromIndex = 
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
-        _valueToMatchingDocsMap);
+        getValueToMatchingDocsMap());
     for (int i = 0; i < numDocs; i++) {
       String[] value = valuesFromIndex[i];
       if (value.length == 0) {
@@ -392,7 +388,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int numDocs = valueBlock.getNumDocs();
     initStringValuesMV(numDocs);
     String[][] valuesFromIndex = 
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
-        _valueToMatchingDocsMap);
+        getValueToMatchingDocsMap());
     for (int i = 0; i < numDocs; i++) {
       String[] value = valuesFromIndex[i];
       if (value.length == 0) {
@@ -411,4 +407,18 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     }
     return _stringValuesMV;
   }
+
+  /**
+   * Lazily initialize _valueToMatchingDocsMap, so that map generation is 
skipped when filtering excludes all values
+   */
+  private Map<String, RoaringBitmap> getValueToMatchingDocsMap() {
+    if (_valueToMatchingDocsMap == null) {
+      _valueToMatchingDocsMap = 
_jsonIndexReader.getMatchingFlattenedDocsMap(_jsonPathString, _filterJsonPath);
+      if (_isSingleValue) {
+        // For single value result type, it's more efficient to use original 
docIDs map
+        
_jsonIndexReader.convertFlattenedDocIdsToDocIds(_valueToMatchingDocsMap);
+      }
+    }
+    return _valueToMatchingDocsMap;
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index 469989843c..f8c0e4562d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -18,17 +18,21 @@
  */
 package org.apache.pinot.core.accounting;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.pinot.common.datatable.DataTable;
@@ -47,8 +51,15 @@ import 
org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
 import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.segment.local.realtime.impl.json.MutableJsonIndexImpl;
+import 
org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
+import 
org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.trace.Tracing;
@@ -368,6 +379,105 @@ public class ResourceManagerAccountingTest {
     Assert.assertTrue(earlyTerminationOccurred.get());
   }
 
+  /**
+   * Test instrumentation in getMatchingFlattenedDocsMap() from
+   * {@link org.apache.pinot.segment.spi.index.reader.JsonIndexReader}
+   *
+   * Since getMatchingFlattenedDocsMap() can collect a large map before 
processing any blocks, it is required to
+   * check for OOM during map generation. This test generates a mutable and 
immutable json index, and generates a map
+   * as would happen in json_extract_index execution.
+   *
+   * It is roughly equivalent to running json_extract_index(col, '$.key', 
'STRING').
+   */
+  @Test
+  public void testJsonIndexExtractMapOOM()
+      throws Exception {
+    HashMap<String, Object> configs = new HashMap<>();
+    ServerMetrics.register(Mockito.mock(ServerMetrics.class));
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+    
LogManager.getLogger(PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.OFF);
+    
LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.OFF);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO,
 0.00f);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
 0.00f);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+        "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
true);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, 
false);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, 
true);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO,
 0.00f);
+
+    PinotConfiguration config = getConfig(2, 2, configs);
+    ResourceManager rm = getResourceManager(2, 2, 1, 1, configs);
+    // init accountant and start watcher task
+    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, 
"testJsonIndexExtractMapOOM");
+
+    Supplier<String> randomJsonValue = () -> {
+      Random random = new Random();
+      int length = random.nextInt(1000);
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < length; i++) {
+        sb.append((char) (random.nextInt(26) + 'a'));
+      }
+      return "{\"key\":\"" + sb + "\"}";
+    };
+
+    File indexDir = new File(FileUtils.getTempDirectory(), 
"testJsonIndexExtractMapOOM");
+    FileUtils.forceMkdir(indexDir);
+    String colName = "col";
+    try (JsonIndexCreator offHeapIndexCreator = new 
OffHeapJsonIndexCreator(indexDir, colName, new JsonIndexConfig());
+        MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(new 
JsonIndexConfig())) {
+      // build json indexes
+      for (int i = 0; i < 1000000; i++) {
+        String val = randomJsonValue.get();
+        offHeapIndexCreator.add(val);
+        mutableJsonIndex.add(val);
+      }
+      offHeapIndexCreator.seal();
+
+      CountDownLatch latch = new CountDownLatch(2);
+      AtomicBoolean mutableEarlyTerminationOccurred = new AtomicBoolean(false);
+
+      // test mutable json index .getMatchingFlattenedDocsMap()
+      rm.getQueryRunners().submit(() -> {
+        Tracing.ThreadAccountantOps.setupRunner("testJsonExtractIndexId1");
+        try {
+          mutableJsonIndex.getMatchingFlattenedDocsMap("key", null);
+        } catch (EarlyTerminationException e) {
+          mutableEarlyTerminationOccurred.set(true);
+          Tracing.ThreadAccountantOps.clear();
+        } finally {
+          latch.countDown();
+        }
+      });
+
+      // test immutable json index .getMatchingFlattenedDocsMap()
+      File indexFile = new File(indexDir, colName + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+      AtomicBoolean immutableEarlyTerminationOccurred = new 
AtomicBoolean(false);
+      rm.getQueryRunners().submit(() -> {
+        Tracing.ThreadAccountantOps.setupRunner("testJsonExtractIndexId2");
+        try {
+          try (PinotDataBuffer offHeapDataBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile);
+              ImmutableJsonIndexReader offHeapIndexReader = new 
ImmutableJsonIndexReader(offHeapDataBuffer, 1000000)) {
+            offHeapIndexReader.getMatchingFlattenedDocsMap("key", null);
+          } catch (IOException e) {
+            Assert.fail("failed .getMatchingFlattenedDocsMap for the immutable 
json index");
+          }
+        } catch (EarlyTerminationException e) {
+          immutableEarlyTerminationOccurred.set(true);
+          Tracing.ThreadAccountantOps.clear();
+        } finally {
+          latch.countDown();
+        }
+      });
+
+      latch.await();
+      Assert.assertTrue(mutableEarlyTerminationOccurred.get(),
+          "Expected early termination reading the mutable index");
+      Assert.assertTrue(immutableEarlyTerminationOccurred.get(),
+          "Expected early termination reading the immutable index");
+    }
+  }
+
   /**
    * Test thread memory usage tracking and query killing in multi-thread 
environment, add @Test to run.
    */
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
index 6e46120aae..23de292693 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
@@ -50,6 +50,7 @@ import 
org.apache.pinot.segment.spi.index.mutable.MutableJsonIndex;
 import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.roaringbitmap.IntConsumer;
@@ -482,6 +483,7 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
         }
         if (!flattenedDocIds.isEmpty()) {
           
valueToMatchingFlattenedDocIdsMap.put(entry.getKey().substring(jsonPathKey.length()
 + 1), flattenedDocIds);
+          
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(valueToMatchingFlattenedDocIdsMap.size());
         }
       }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
index 3500b11172..e94eee4170 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
@@ -48,6 +48,7 @@ import 
org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.roaringbitmap.IntConsumer;
@@ -458,6 +459,7 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
 
       if (!docIds.isEmpty()) {
         result.put(key.substring(jsonPathKey.length() + 1), docIds);
+        
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(result.size());
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to