Copilot commented on code in PR #17593:
URL: https://github.com/apache/pinot/pull/17593#discussion_r2740484550


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java:
##########
@@ -34,21 +34,77 @@
 
 /**
  * Extractor for ProtoBuf records
+ *
+ * <p>Performance optimizations:
+ * <ul>
+ *   <li>Field descriptors are cached during initialization to avoid repeated 
lookups</li>
+ *   <li>A reusable ProtoBufFieldInfo instance is used to reduce object 
allocation</li>
+ * </ul>
  */
 @SuppressWarnings("unchecked")
 public class ProtoBufRecordExtractor extends BaseRecordExtractor<Message> {
 
   private Set<String> _fields;
   private boolean _extractAll = false;
 
+  // Cached field descriptors to avoid repeated lookups via findFieldByName
+  private Descriptors.FieldDescriptor[] _cachedFieldDescriptors;
+  private String[] _cachedFieldNames;
+  // Store the descriptor's full name to detect schema changes
+  private String _cachedDescriptorFullName;

Review Comment:
   The comment on line 50 should clarify that these caches are initialized 
lazily on first extraction rather than during `init()`. Consider rephrasing to: 
'Cached field descriptors initialized lazily on first message extraction to 
avoid repeated lookups via findFieldByName'.



##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractorCachingTest.java:
##########
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pinot.plugin.inputformat.protobuf.ProtoBufTestDataGenerator.*;
+import static org.testng.Assert.*;
+
+
+/**
+ * Comprehensive tests for {@link ProtoBufRecordExtractor} field descriptor 
caching and optimization behavior.
+ *
+ * <p>These tests verify:
+ * <ul>
+ *   <li>Field descriptor caching works correctly across multiple 
extractions</li>
+ *   <li>Subset field extraction works correctly</li>
+ *   <li>Schema change detection and cache invalidation</li>
+ *   <li>Re-initialization behavior</li>
+ *   <li>Edge cases like missing fields, empty fields set</li>
+ * </ul>
+ */
+public class ProtoBufRecordExtractorCachingTest {
+
+  private ProtoBufRecordExtractor _extractor;
+  private GenericRow _reusableRow;
+
+  @BeforeMethod
+  public void setUp() {
+    _extractor = new ProtoBufRecordExtractor();
+    _reusableRow = new GenericRow();
+  }
+
+  // ==================== CACHING BEHAVIOR TESTS ====================
+
+  @Test
+  public void testCachingWithMultipleExtractions()
+      throws IOException {
+    // Initialize extractor for all fields
+    _extractor.init(null, null);
+
+    // Create multiple messages and verify extraction is consistent
+    Message message1 = createTestMessage("test1", 100);
+    Message message2 = createTestMessage("test2", 200);
+    Message message3 = createTestMessage("test3", 300);
+
+    // First extraction initializes the cache
+    GenericRow row1 = _extractor.extract(message1, new GenericRow());
+    assertEquals(row1.getValue(STRING_FIELD), "test1");
+    assertEquals(row1.getValue(INT_FIELD), 100);
+
+    // Second extraction uses cached descriptors
+    GenericRow row2 = _extractor.extract(message2, new GenericRow());
+    assertEquals(row2.getValue(STRING_FIELD), "test2");
+    assertEquals(row2.getValue(INT_FIELD), 200);
+
+    // Third extraction uses cached descriptors
+    GenericRow row3 = _extractor.extract(message3, new GenericRow());
+    assertEquals(row3.getValue(STRING_FIELD), "test3");
+    assertEquals(row3.getValue(INT_FIELD), 300);
+  }
+
+  @Test
+  public void testCachingWithReusableGenericRow()
+      throws IOException {
+    _extractor.init(null, null);
+
+    Message message1 = createTestMessage("reuse1", 111);
+    Message message2 = createTestMessage("reuse2", 222);
+
+    // Extract first message
+    _reusableRow.clear();
+    _extractor.extract(message1, _reusableRow);
+    assertEquals(_reusableRow.getValue(STRING_FIELD), "reuse1");
+    assertEquals(_reusableRow.getValue(INT_FIELD), 111);
+
+    // Extract second message into same row (after clear)
+    _reusableRow.clear();
+    _extractor.extract(message2, _reusableRow);
+    assertEquals(_reusableRow.getValue(STRING_FIELD), "reuse2");
+    assertEquals(_reusableRow.getValue(INT_FIELD), 222);
+  }
+
+  // ==================== SUBSET FIELD EXTRACTION TESTS ====================
+
+  @Test
+  public void testSubsetFieldExtraction()
+      throws IOException {
+    Set<String> subsetFields = new HashSet<>(Arrays.asList(STRING_FIELD, 
INT_FIELD, LONG_FIELD));
+    _extractor.init(subsetFields, null);
+
+    Message message = createTestMessage("subset", 999);
+    GenericRow row = _extractor.extract(message, new GenericRow());
+
+    // Verify only requested fields are extracted
+    assertEquals(row.getValue(STRING_FIELD), "subset");
+    assertEquals(row.getValue(INT_FIELD), 999);
+    assertNotNull(row.getValue(LONG_FIELD));
+
+    // Verify the row contains exactly the requested fields
+    assertEquals(row.getFieldToValueMap().size(), 3);
+    assertTrue(row.getFieldToValueMap().containsKey(STRING_FIELD));
+    assertTrue(row.getFieldToValueMap().containsKey(INT_FIELD));
+    assertTrue(row.getFieldToValueMap().containsKey(LONG_FIELD));
+  }
+
+  @Test
+  public void testSubsetFieldExtractionWithMultipleMessages()
+      throws IOException {
+    Set<String> subsetFields = new HashSet<>(Arrays.asList(STRING_FIELD, 
DOUBLE_FIELD));
+    _extractor.init(subsetFields, null);
+
+    // Multiple extractions with subset fields
+    for (int i = 0; i < 10; i++) {
+      Message message = createTestMessage("iter_" + i, i * 100);
+      _reusableRow.clear();
+      _extractor.extract(message, _reusableRow);
+
+      assertEquals(_reusableRow.getValue(STRING_FIELD), "iter_" + i);
+      assertNotNull(_reusableRow.getValue(DOUBLE_FIELD));
+      assertEquals(_reusableRow.getFieldToValueMap().size(), 2);
+    }
+  }
+
+  @Test
+  public void testSubsetWithNonExistentField()
+      throws IOException {
+    // Include a field that doesn't exist in the schema
+    Set<String> subsetFields = new HashSet<>(Arrays.asList(STRING_FIELD, 
"non_existent_field"));
+    _extractor.init(subsetFields, null);
+
+    Message message = createTestMessage("test", 100);
+    GenericRow row = _extractor.extract(message, new GenericRow());
+
+    // Existing field should be extracted
+    assertEquals(row.getValue(STRING_FIELD), "test");
+    // Non-existent field should be null
+    assertNull(row.getValue("non_existent_field"));
+  }
+
+  // ==================== SCHEMA CHANGE DETECTION TESTS ====================
+
+  @Test
+  public void testSchemaChangeDetectionWithDifferentMessageTypes()
+      throws IOException {
+    _extractor.init(null, null);
+
+    // Extract from ComplexTypes.TestMessage
+    Message complexMessage = createTestMessage("complex", 100);
+    GenericRow row1 = _extractor.extract(complexMessage, new GenericRow());
+    assertEquals(row1.getValue(STRING_FIELD), "complex");
+
+    // Extract from Sample.SampleRecord (different message type)
+    Message sampleMessage = getSampleRecordMessage();
+    GenericRow row2 = _extractor.extract(sampleMessage, new GenericRow());
+
+    // Should have re-initialized cache for different message type
+    assertEquals(row2.getValue("name"), "Alice");
+    assertEquals(row2.getValue("email"), "[email protected]");
+    assertEquals(row2.getValue("id"), 18);
+  }
+
+  @Test
+  public void testExtractorReinitialization()
+      throws IOException {
+    // First initialization with all fields
+    _extractor.init(null, null);
+    Message message1 = createTestMessage("init1", 100);
+    GenericRow row1 = _extractor.extract(message1, new GenericRow());
+    int allFieldsCount = row1.getFieldToValueMap().size();
+    assertTrue(allFieldsCount > 5); // All fields from schema (27 total)

Review Comment:
   The magic number 27 in the comment could become outdated if the schema 
changes. Consider removing the specific count or making it a named constant 
that can be validated against the actual schema.
   ```suggestion
       assertTrue(allFieldsCount > 5); // All fields from schema
   ```



##########
pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkProtoBufRecordExtractor.java:
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder;
+import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+/**
+ * JMH Benchmark for ProtoBufRecordExtractor field descriptor caching 
optimization.
+ *
+ * <p>This benchmark measures the performance improvement from:
+ * <ul>
+ *   <li>Field descriptor caching (eliminates repeated findFieldByName() 
calls)</li>
+ *   <li>Reusable ProtoBufFieldInfo object (reduces GC pressure)</li>
+ * </ul>
+ *
+ * <p>Run with:
+ * <pre>
+ * java -jar pinot-perf/target/benchmarks.jar BenchmarkProtoBufRecordExtractor
+ * </pre>
+ *
+ * <p>Or via generated script:
+ * <pre>
+ * 
./pinot-perf/target/pinot-perf-pkg/bin/pinot-BenchmarkProtoBufRecordExtractor.sh
+ * </pre>
+ */
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 3)
+@Measurement(iterations = 5, time = 5)
+@State(Scope.Benchmark)
+public class BenchmarkProtoBufRecordExtractor {
+
+  private static final String DESCRIPTOR_FILE = "complex_types.desc";
+  private static final String PROTO_CLASS_NAME = "TestMessage";
+
+  // Field name constants
+  private static final String STRING_FIELD = "string_field";
+  private static final String INT_FIELD = "int_field";
+  private static final String LONG_FIELD = "long_field";
+  private static final String DOUBLE_FIELD = "double_field";
+  private static final String FLOAT_FIELD = "float_field";
+  private static final String BOOL_FIELD = "bool_field";
+  private static final String BYTES_FIELD = "bytes_field";
+  private static final String REPEATED_STRINGS = "repeated_strings";
+  private static final String NESTED_MESSAGE = "nested_message";
+  private static final String NESTED_INT_FIELD = "nested_int_field";
+  private static final String NESTED_STRING_FIELD = "nested_string_field";
+  private static final String REPEATED_NESTED_MESSAGES = 
"repeated_nested_messages";
+  private static final String COMPLEX_MAP = "complex_map";
+  private static final String SIMPLE_MAP = "simple_map";
+  private static final String ENUM_FIELD = "enum_field";
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt =
+        new 
OptionsBuilder().include(BenchmarkProtoBufRecordExtractor.class.getSimpleName()).shouldDoGC(true);
+    new Runner(opt.build()).run();
+  }
+
+  @Param({"all_fields", "subset_5_fields", "single_field"})
+  private String _extractionMode;
+
+  private static final int NUM_MESSAGES = 1000;

Review Comment:
   Consider making NUM_MESSAGES configurable via a JMH parameter to allow 
testing different dataset sizes and their impact on caching effectiveness.
   ```suggestion
     @Param({"1000"})
     private int NUM_MESSAGES = 1000;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to