This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch security/pinot-avro-remove-md5-wording-clean in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 3250836669f6f0c35e179e01d54f0251c0e55a3f Author: Xiang Fu <[email protected]> AuthorDate: Mon Mar 2 17:21:58 2026 -0800 Refactor schema hash naming in KafkaAvroMessageDecoder. This removes MD5-specific wording from internal identifiers and comments in the pinot-avro decoder to align terminology with FIPS-compliant hash usage without changing behavior. Made-with: Cursor --- .../inputformat/avro/KafkaAvroMessageDecoder.java | 91 +++++++++++----------- 1 file changed, 45 insertions(+), 46 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java index c21dbe2356a..5364d19c165 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java +++ b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java @@ -34,6 +34,8 @@ import java.util.Set; import java.util.concurrent.Callable; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; +import org.apache.avro.Schema; +import org.apache.avro.SchemaParser; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; @@ -59,16 +61,16 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { private static final String SCHEMA_REGISTRY_REST_URL = "schema.registry.rest.url"; private static final String SCHEMA_REGISTRY_SCHEMA_NAME = "schema.registry.schema.name"; - private org.apache.avro.Schema _defaultAvroSchema; - private MD5AvroSchemaMap _md5ToAvroSchemaMap; + private Schema _defaultAvroSchema; + private HashToSchemaMap _hashToSchemaMap; // A global cache for schemas across all threads. - private static final Map<String, org.apache.avro.Schema> GLOBAL_SCHEMA_CACHE = new HashMap<>(); + private static final Map<String, Schema> GLOBAL_SCHEMA_CACHE = new HashMap<>(); // Suffix for getting the latest schema private static final String LATEST = "-latest"; - // Reusable byte[] to read MD5 from payload. This is OK as this class is used only by a single thread. - private final byte[] _reusableMD5Bytes = new byte[SCHEMA_HASH_LENGTH]; + // Reusable byte[] to read hash from payload. This class is used only by a single thread. + private final byte[] _reusableHashBytes = new byte[SCHEMA_HASH_LENGTH]; private DecoderFactory _decoderFactory; private RecordExtractor<GenericData.Record> _avroRecordExtractor; @@ -98,9 +100,9 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { // With the logic below, we may not set defaultAvroSchema to be the latest one everytime. // The schema is fetched once when the machine starts. Until the next restart. the latest schema is // not fetched. - // But then we always pay attention to the exact MD5 hash and attempt to fetch the schema for that particular hash - // before decoding an incoming kafka event. We use defaultAvroSchema only if the fetch for the particular MD5 fails, - // but then we will retry that fetch on every event in case of failure. + // But then we always pay attention to the exact hash and attempt to fetch the schema for that particular hash + // before decoding an incoming kafka event. We use defaultAvroSchema only if the fetch for the particular hash + // fails, but then we will retry that fetch on every event in case of failure. synchronized (GLOBAL_SCHEMA_CACHE) { final String hashKey = avroSchemaName + LATEST; _defaultAvroSchema = GLOBAL_SCHEMA_CACHE.get(hashKey); @@ -125,7 +127,7 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { _avroRecordExtractor = PluginManager.get().createInstance(recordExtractorClass); _avroRecordExtractor.init(fieldsToRead, config); _decoderFactory = new DecoderFactory(); - _md5ToAvroSchemaMap = new MD5AvroSchemaMap(); + _hashToSchemaMap = new HashToSchemaMap(); } @Nullable @@ -142,25 +144,25 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { return null; } - System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, _reusableMD5Bytes, 0, SCHEMA_HASH_LENGTH); + System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, _reusableHashBytes, 0, SCHEMA_HASH_LENGTH); boolean schemaUpdateFailed = false; - org.apache.avro.Schema schema = _md5ToAvroSchemaMap.getSchema(_reusableMD5Bytes); + Schema schema = _hashToSchemaMap.getSchema(_reusableHashBytes); if (schema == null) { // We will get here for the first row consumed in the segment, and every row that has a schema ID that is - // not yet in md5ToAvroSchemaMap. + // not yet in hashToAvroSchemaMap. synchronized (GLOBAL_SCHEMA_CACHE) { - final String hashKey = hex(_reusableMD5Bytes); + final String hashKey = hex(_reusableHashBytes); schema = GLOBAL_SCHEMA_CACHE.get(hashKey); if (schema == null) { // We will get here only if no partition of the table has populated the global schema cache. // In that case, one of the consumers will fetch the schema and populate the cache, and the others // should find it in the cache and po - final String schemaUri = "/id=" + hex(_reusableMD5Bytes); + final String schemaUri = "/id=" + hex(_reusableHashBytes); try { schema = fetchSchema(schemaUri); GLOBAL_SCHEMA_CACHE.put(hashKey, schema); - _md5ToAvroSchemaMap.addSchema(_reusableMD5Bytes, schema); + _hashToSchemaMap.addSchema(_reusableHashBytes, schema); } catch (Exception e) { schema = _defaultAvroSchema; LOGGER.error("Error fetching schema using url {}. Attempting to continue with previous schema", schemaUri, @@ -169,7 +171,7 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { } } else { LOGGER.info("Found schema for {} in cache", hashKey); - _md5ToAvroSchemaMap.addSchema(_reusableMD5Bytes, schema); + _hashToSchemaMap.addSchema(_reusableHashBytes, schema); } } } @@ -198,22 +200,20 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { } private static class SchemaFetcher implements Callable<Boolean> { - private org.apache.avro.Schema _schema; - private URL _url; - private boolean _isSuccessful = false; + private Schema _schema; + private final URL _url; SchemaFetcher(URL url) { _url = url; } @Override - public Boolean call() - throws Exception { + public Boolean call() { try { URLConnection conn = _url.openConnection(); conn.setConnectTimeout(15000); conn.setReadTimeout(15000); - LOGGER.info("Fetching schema using url {}", _url.toString()); + LOGGER.info("Fetching schema using url {}", _url); StringBuilder queryResp = new StringBuilder(); try (BufferedReader reader = new BufferedReader( @@ -222,10 +222,9 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { queryResp.append(line); } } + _schema = new SchemaParser().parse(queryResp.toString()); - _schema = org.apache.avro.Schema.parse(queryResp.toString()); - - LOGGER.info("Schema fetch succeeded on url {}", _url.toString()); + LOGGER.info("Schema fetch succeeded on url {}", _url); return Boolean.TRUE; } catch (Exception e) { LOGGER.warn("Caught exception while fetching schema", e); @@ -233,12 +232,12 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { } } - public org.apache.avro.Schema getSchema() { + public Schema getSchema() { return _schema; } } - private org.apache.avro.Schema fetchSchema(String reference) + private Schema fetchSchema(String reference) throws Exception { SchemaFetcher schemaFetcher = new SchemaFetcher(makeRandomUrl(reference)); RetryPolicies @@ -248,33 +247,33 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { } /** - * Private class for encapsulating MD5 to Avro schema mapping. + * Private class for encapsulating hash to Avro schema mapping. * <ul> - * <li> Maintains two lists, one for md5s and another for schema. </li> - * <li> MD5 at index i in the MD5 list, corresponds to Schema at index i in the schema list. </li> + * <li> Maintains two lists, one for hashes and another for schema. </li> + * <li> Hash at index i in the hash list corresponds to schema at index i in the schema list. </li> * </ul> */ - private static class MD5AvroSchemaMap { - private List<byte[]> _md5s; - private List<org.apache.avro.Schema> _schemas; + private static class HashToSchemaMap { + private final List<byte[]> _hashes; + private final List<Schema> _schemas; /** * Constructor for the class. */ - private MD5AvroSchemaMap() { - _md5s = new ArrayList<>(); + private HashToSchemaMap() { + _hashes = new ArrayList<>(); _schemas = new ArrayList<>(); } /** - * Returns the Avro schema corresponding to the given MD5. + * Returns the Avro schema corresponding to the given hash. * - * @param md5ForSchema MD5 for which to get the avro schema. - * @return Avro schema for the given MD5. + * @param hash Hash for which to get the avro schema. + * @return Avro schema for the given hash. */ - private org.apache.avro.Schema getSchema(byte[] md5ForSchema) { - for (int i = 0; i < _md5s.size(); i++) { - if (Arrays.equals(_md5s.get(i), md5ForSchema)) { + private Schema getSchema(byte[] hash) { + for (int i = 0; i < _hashes.size(); i++) { + if (Arrays.equals(_hashes.get(i), hash)) { return _schemas.get(i); } } @@ -282,14 +281,14 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { } /** - * Adds mapping between MD5 and Avro schema. - * Caller to ensure that addSchema is called only once per MD5-Schema pair. + * Adds mapping between hash and avro schema. + * Caller to ensure that addSchema is called only once per hash-schema pair. * - * @param md5 MD5 for the Schema + * @param hash Hash for the schema * @param schema Avro Schema */ - private void addSchema(byte[] md5, org.apache.avro.Schema schema) { - _md5s.add(Arrays.copyOf(md5, md5.length)); + private void addSchema(byte[] hash, Schema schema) { + _hashes.add(Arrays.copyOf(hash, hash.length)); _schemas.add(schema); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
