gortiz commented on code in PR #11753:
URL: https://github.com/apache/pinot/pull/11753#discussion_r1348437620
##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufConfluentSchemaTest.java:
##########
@@ -94,8 +103,16 @@ public void testSamplePinotConsumer()
ConsumerRecords<byte[], byte[]> consumerRecords =
kafkaConsumer.poll(Duration.ofMillis(1000));
Iterator<ConsumerRecord<byte[], byte[]>> iter = consumerRecords.iterator();
+ Consumer<Message> onMessage = message -> {
+ // we use this to verify we are not creating/consuming modified
descriptors
+ // older versions of confluent connectors (7.1.x and lower) used to
rewrite proto3 optional as oneof at descriptor
+ // level. Newer versions of confluent consumers support both
alternatives.
+ Descriptors.FieldDescriptor optionalField =
message.getDescriptorForType().findFieldByName("optionalField");
+ Assert.assertNull(optionalField.getRealContainingOneof(), "Received
protobuf have been rewritten");
Review Comment:
As we know, `optional` fields are transformed into `oneof`.
Older versions of confluent always applied this change in the producer and
it was never applied in the consumer. From 7.1.x, this change is not applied in
the producer and it is transparently applied in the consumer. That means that:
- The registry stores the original descriptor, which uses `optional`
- The consumer receives the original descriptor and applies the
transformation internally
As explained in [implementing_proto3_presence.md]
(https://github.com/protocolbuffers/protobuf/blob/main/docs/implementing_proto3_presence.md)
(in protobuf repo), proto3 apis provide a way to verify whether this `oneof`
is synthetic or real. If it is synthetic it means that it was added at runtime.
Otherwise it means it was explicitly defined as `oneof` in the descriptor file.
This is the only way I've found to test whether the received message was
read using a modified protobuf descriptor or not. If we use confluent 5.5.3 (as
we used to do), `optionalField.getRealCOntainingOneOf` is different than null
(because what is stored in the schema is the modified descriptor). When using
7.4.0 (or any version >= 7.1.0) this returns null.
This test is not great, but it is the best I was able to write. What I would
like to do is to have different producers and different consumers using
different confluent versions and verify that consumer can always read the
message, independently of the confluent version used by the producer. But I
don't think that can be done in a surefire test given that only a single
version of a library can be in the classpath.
I've made that test manually and I can verify that the compatibility table
is something like:
| producer version | consumer version | stored in registry | correct null
read |
|---|---|---|---|
| 5.5.3 | 5.5.3 | oneof | yes |
| 5.5.3 | 7.0.x | oneof | yes |
| 5.5.3 | 7.1.x | oneof | yes |
| 5.5.3 | 7.4.x | oneof | yes |
| 7.0.x | 5.5.3 | oneof | yes |
| 7.0.x | 7.0.x | oneof | yes |
| 7.0.x | 7.1.x | oneof | yes |
| 7.0.x | 7.4.x | oneof | yes |
| 7.1.x | 5.5.3 | optional | no |
| 7.1.x | 7.0.x | optional | no |
| 7.1.x | 7.1.x | optional | yes |
| 7.1.x | 7.4.x | optional | yes |
| 7.4.x | 5.5.3 | optional | yes |
| 7.4.x | 7.0.x | optional | yes |
| 7.4.x | 7.1.x | optional | yes |
| 7.4.x | 7.4.x | optional | yes |
So the TL;DR is:
- if we use confluent <= 7.0.x we can read message emitted with [5.5.3,
7.1.x] but not when they are emitted with >= 7.1.x
- if we use confluent >= 7.1.x we can read message emitted with >= 5.5.3
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]