This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9648c99cf5a Refactor schema hash naming in KafkaAvroMessageDecoder. 
(#17798)
9648c99cf5a is described below

commit 9648c99cf5a229d96b42ca456c167d9935e85270
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Mar 2 23:36:41 2026 -0800

    Refactor schema hash naming in KafkaAvroMessageDecoder. (#17798)
    
    This removes MD5-specific wording from internal identifiers and comments in 
the pinot-avro decoder to align terminology with FIPS-compliant hash usage 
without changing behavior.
    
    Made-with: Cursor
---
 .../inputformat/avro/KafkaAvroMessageDecoder.java  | 92 +++++++++++-----------
 1 file changed, 45 insertions(+), 47 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
 
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
index c21dbe2356a..56ba6958486 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
@@ -59,16 +60,16 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
 
   private static final String SCHEMA_REGISTRY_REST_URL = 
"schema.registry.rest.url";
   private static final String SCHEMA_REGISTRY_SCHEMA_NAME = 
"schema.registry.schema.name";
-  private org.apache.avro.Schema _defaultAvroSchema;
-  private MD5AvroSchemaMap _md5ToAvroSchemaMap;
+  private Schema _defaultAvroSchema;
+  private HashToSchemaMap _hashToSchemaMap;
 
   // A global cache for schemas across all threads.
-  private static final Map<String, org.apache.avro.Schema> GLOBAL_SCHEMA_CACHE 
= new HashMap<>();
+  private static final Map<String, Schema> GLOBAL_SCHEMA_CACHE = new 
HashMap<>();
   // Suffix for getting the latest schema
   private static final String LATEST = "-latest";
 
-  // Reusable byte[] to read MD5 from payload. This is OK as this class is 
used only by a single thread.
-  private final byte[] _reusableMD5Bytes = new byte[SCHEMA_HASH_LENGTH];
+  // Reusable byte[] to read hash from payload. This class is used only by a 
single thread.
+  private final byte[] _reusableHashBytes = new byte[SCHEMA_HASH_LENGTH];
 
   private DecoderFactory _decoderFactory;
   private RecordExtractor<GenericData.Record> _avroRecordExtractor;
@@ -98,9 +99,9 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
     // With the logic below, we may not set defaultAvroSchema to be the latest 
one everytime.
     // The schema is fetched once when the machine starts. Until the next 
restart. the latest schema is
     // not fetched.
-    // But then we always pay attention to the exact MD5 hash and attempt to 
fetch the schema for that particular hash
-    // before decoding an incoming kafka event. We use defaultAvroSchema only 
if the fetch for the particular MD5 fails,
-    // but then we will retry that fetch on every event in case of failure.
+    // But then we always pay attention to the exact hash and attempt to fetch 
the schema for that particular hash
+    // before decoding an incoming kafka event. We use defaultAvroSchema only 
if the fetch for the particular hash
+    // fails, but then we will retry that fetch on every event in case of 
failure.
     synchronized (GLOBAL_SCHEMA_CACHE) {
       final String hashKey = avroSchemaName + LATEST;
       _defaultAvroSchema = GLOBAL_SCHEMA_CACHE.get(hashKey);
@@ -125,7 +126,7 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
     _avroRecordExtractor = 
PluginManager.get().createInstance(recordExtractorClass);
     _avroRecordExtractor.init(fieldsToRead, config);
     _decoderFactory = new DecoderFactory();
-    _md5ToAvroSchemaMap = new MD5AvroSchemaMap();
+    _hashToSchemaMap = new HashToSchemaMap();
   }
 
   @Nullable
@@ -142,25 +143,25 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
       return null;
     }
 
-    System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, 
_reusableMD5Bytes, 0, SCHEMA_HASH_LENGTH);
+    System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, 
_reusableHashBytes, 0, SCHEMA_HASH_LENGTH);
 
     boolean schemaUpdateFailed = false;
-    org.apache.avro.Schema schema = 
_md5ToAvroSchemaMap.getSchema(_reusableMD5Bytes);
+    Schema schema = _hashToSchemaMap.getSchema(_reusableHashBytes);
     if (schema == null) {
       // We will get here for the first row consumed in the segment, and every 
row that has a schema ID that is
-      // not yet in md5ToAvroSchemaMap.
+      // not yet in hashToAvroSchemaMap.
       synchronized (GLOBAL_SCHEMA_CACHE) {
-        final String hashKey = hex(_reusableMD5Bytes);
+        final String hashKey = hex(_reusableHashBytes);
         schema = GLOBAL_SCHEMA_CACHE.get(hashKey);
         if (schema == null) {
           // We will get here only if no partition of the table has populated 
the global schema cache.
           // In that case, one of the consumers will fetch the schema and 
populate the cache, and the others
-          // should find it in the cache and po
-          final String schemaUri = "/id=" + hex(_reusableMD5Bytes);
+          // should find it in the cache and populate their local schema maps.
+          final String schemaUri = "/id=" + hex(_reusableHashBytes);
           try {
             schema = fetchSchema(schemaUri);
             GLOBAL_SCHEMA_CACHE.put(hashKey, schema);
-            _md5ToAvroSchemaMap.addSchema(_reusableMD5Bytes, schema);
+            _hashToSchemaMap.addSchema(_reusableHashBytes, schema);
           } catch (Exception e) {
             schema = _defaultAvroSchema;
             LOGGER.error("Error fetching schema using url {}. Attempting to 
continue with previous schema", schemaUri,
@@ -169,7 +170,7 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
           }
         } else {
           LOGGER.info("Found schema for {} in cache", hashKey);
-          _md5ToAvroSchemaMap.addSchema(_reusableMD5Bytes, schema);
+          _hashToSchemaMap.addSchema(_reusableHashBytes, schema);
         }
       }
     }
@@ -198,22 +199,20 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
   }
 
   private static class SchemaFetcher implements Callable<Boolean> {
-    private org.apache.avro.Schema _schema;
-    private URL _url;
-    private boolean _isSuccessful = false;
+    private Schema _schema;
+    private final URL _url;
 
     SchemaFetcher(URL url) {
       _url = url;
     }
 
     @Override
-    public Boolean call()
-        throws Exception {
+    public Boolean call() {
       try {
         URLConnection conn = _url.openConnection();
         conn.setConnectTimeout(15000);
         conn.setReadTimeout(15000);
-        LOGGER.info("Fetching schema using url {}", _url.toString());
+        LOGGER.info("Fetching schema using url {}", _url);
 
         StringBuilder queryResp = new StringBuilder();
         try (BufferedReader reader = new BufferedReader(
@@ -222,10 +221,9 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
             queryResp.append(line);
           }
         }
+        _schema = Schema.parse(queryResp.toString());
 
-        _schema = org.apache.avro.Schema.parse(queryResp.toString());
-
-        LOGGER.info("Schema fetch succeeded on url {}", _url.toString());
+        LOGGER.info("Schema fetch succeeded on url {}", _url);
         return Boolean.TRUE;
       } catch (Exception e) {
         LOGGER.warn("Caught exception while fetching schema", e);
@@ -233,12 +231,12 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
       }
     }
 
-    public org.apache.avro.Schema getSchema() {
+    public Schema getSchema() {
       return _schema;
     }
   }
 
-  private org.apache.avro.Schema fetchSchema(String reference)
+  private Schema fetchSchema(String reference)
       throws Exception {
     SchemaFetcher schemaFetcher = new SchemaFetcher(makeRandomUrl(reference));
     RetryPolicies
@@ -248,33 +246,33 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
   }
 
   /**
-   * Private class for encapsulating MD5 to Avro schema mapping.
+   * Private class for encapsulating hash to Avro schema mapping.
    * <ul>
-   *   <li> Maintains two lists, one for md5s and another for schema. </li>
-   *   <li> MD5 at index i in the MD5 list, corresponds to Schema at index i 
in the schema list. </li>
+   *   <li> Maintains two lists, one for hashes and another for schema. </li>
+   *   <li> Hash at index i in the hash list corresponds to schema at index i 
in the schema list. </li>
    * </ul>
    */
-  private static class MD5AvroSchemaMap {
-    private List<byte[]> _md5s;
-    private List<org.apache.avro.Schema> _schemas;
+  private static class HashToSchemaMap {
+    private final List<byte[]> _hashes;
+    private final List<Schema> _schemas;
 
     /**
      * Constructor for the class.
      */
-    private MD5AvroSchemaMap() {
-      _md5s = new ArrayList<>();
+    private HashToSchemaMap() {
+      _hashes = new ArrayList<>();
       _schemas = new ArrayList<>();
     }
 
     /**
-     * Returns the Avro schema corresponding to the given MD5.
+     * Returns the Avro schema corresponding to the given hash.
      *
-     * @param md5ForSchema MD5 for which to get the avro schema.
-     * @return Avro schema for the given MD5.
+     * @param hash Hash for which to get the avro schema.
+     * @return Avro schema for the given hash.
      */
-    private org.apache.avro.Schema getSchema(byte[] md5ForSchema) {
-      for (int i = 0; i < _md5s.size(); i++) {
-        if (Arrays.equals(_md5s.get(i), md5ForSchema)) {
+    private Schema getSchema(byte[] hash) {
+      for (int i = 0; i < _hashes.size(); i++) {
+        if (Arrays.equals(_hashes.get(i), hash)) {
           return _schemas.get(i);
         }
       }
@@ -282,14 +280,14 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
     }
 
     /**
-     * Adds mapping between MD5 and Avro schema.
-     * Caller to ensure that addSchema is called only once per MD5-Schema pair.
+     * Adds mapping between hash and avro schema.
+     * Caller to ensure that addSchema is called only once per hash-schema 
pair.
      *
-     * @param md5 MD5 for the Schema
+     * @param hash Hash for the schema
      * @param schema Avro Schema
      */
-    private void addSchema(byte[] md5, org.apache.avro.Schema schema) {
-      _md5s.add(Arrays.copyOf(md5, md5.length));
+    private void addSchema(byte[] hash, Schema schema) {
+      _hashes.add(Arrays.copyOf(hash, hash.length));
       _schemas.add(schema);
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to