[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17774807#comment-17774807
 ] 

Philip Nee edited comment on KAFKA-15602 at 10/13/23 6:26 AM:
--------------------------------------------------------------

Hi [~luke.kirby] ,

Thanks for reporting this.  I briefly went over the code a bit and I can now 
understand your concern.

The original documentation mentioned the user "{_}Do not need to flip{_}", I 
assume that means the serializer will automatically rewind the position to zero 
for the user as flip explicitly stated ".{_}..limit is set to the current 
position and then the position is set to zero{_}".  However, the serializer API 
didn't make any assumptions to the current position of the buffer, so it is 
hard to say if the original design intended to handle a user-given offset... 
From reading the unit test, it doesn't seem so.

I can see the problem comes from the serializer doesn't know if the position is 
an offset or just the next byte to be written.  These are two different 
definitions of the position so it doesn't really make sense to handle both 
cases in a single API call.

I wonder if we could do the following:

If the user doesn't want to provide an offset, which is the most common use 
case, the user may continue using the existing API.

If the user wraps the buffer with an offset, we might want to provide another 
API to specify the buffer, so that we can rewind the position to the correct 
position.

[~guozhang] - Would you kindly provide some input on this issue? As you 
reviewed the KAFKA-4852. 

Thanks!

P


was (Author: JIRAUSER283568):
Hi Luke,

Thanks for reporting this.  I briefly went over the code a bit and I can now 
understand your concern.

The original documentation mentioned the user "{_}Do not need to flip{_}", I 
assume that means the serializer will automatically rewind the position to zero 
for the user as flip explicitly stated ".{_}..limit is set to the current 
position and then the position is set to zero{_}".  However, the serializer API 
didn't make any assumptions to the current position of the buffer, so it is 
hard to say if the original design intended to handle a user-given offset... 
From reading the unit test, it doesn't seem so.

I can see the problem comes from the serializer doesn't know if the position is 
an offset or just the next byte to be written.  These are two different 
definitions of the position so it doesn't really make sense to handle both 
cases in a single API call.

I wonder if we could do the following:

If the user doesn't want to provide an offset, which is the most common use 
case, the user may continue using the existing API.

If the user wraps the buffer with an offset, we might want to provide another 
API to specify the buffer, so that we can rewind the position to the correct 
position.

[~guozhang] - Would you kindly provide some input on this issue? As you 
reviewed the KAFKA-4852. 

Thanks!

P

> Breaking change in 3.4.0 ByteBufferSerializer
> ---------------------------------------------
>
>                 Key: KAFKA-15602
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15602
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>            Reporter: Luke Kirby
>            Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it knows where to start reading from */ 
> producer.send(bb); {code}
> Technically, you wouldn't even need to use flip() there, since position is 
> ignored; it would sufficient to just call {{{}bb.limit(bb.position()){}}}. 
> Notably, a buffer constructed using any variant of ByteBuffer.wrap() is 
> essentially immediately in read-mode with position indicating the start and 
> limit the end.
> With the change introduced in 3.4.0, however, the contract changes 
> dramatically, and the code just presented produces a 0-byte message. As 
> indicated above, it also continues to fail if you just passed in an 
> offset-specified ByteBuffer.wrap()ped message, too, i.e., the case described 
> by KAFKA-4852:
> {code:java}
> @Test
> public void testByteBufferSerializerOnOffsetWrappedBytes() {
>     final byte[] bytes = "Hello".getBytes(UTF_8);
>     try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
>         assertArrayEquals("ello".getBytes(UTF_8), 
>                 // FAILS: this will yield "H", not "ello"
>                 serializer.serialize(topic, ByteBuffer.wrap(bytes, 1, 
> bytes.length - 1)));
>     }
> }
>  {code}
> What happened here?
> The resulting PR, it seems, focussed on a flawed proposed test case in the 
> first comment of KAFKA-4852 that failed against pre-3.4.0 Kafka. I reproduce 
> that here with commented annotations from me: 
> {code:java}
> @Test // flawed proposed test case
> public void testByteBufferSerializer() {
>     final byte[] bytes = "Hello".getBytes(UTF_8);
>     final ByteBuffer buffer = ByteBuffer.allocate(7);
>     buffer.put(bytes);
>     // buffer.flip();   <-- would make the test work
>     try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
>         assertArrayEquals(bytes, serializer.serialize(topic, buffer));
>     }
> }  {code}
> In pre-3.4.0, this would yield a 7-byte serialization of "Hello" followed by 
> 2 0-value bytes. I contend that this was actually expected and correct 
> behavior, as the ByteBuffer had never had its limit set, so the implicit and 
> mildly expected contract was never actually abided by. If there was a 
> buffer.flip() after the .put(bytes) call, as the calling code _should_ have 
> applied, however, then the test would have succeeded. In short, by trying to 
> make this test case succeed, I think this PR represented nothing more than a 
> misunderstanding for how one should prepare ByteBuffers for reading, but has 
> managed to result in a breaking change. The breaking nature of this was 
> actually briefly noted in PR comments but kind of shrugged off with some test 
> changes and explanatory comments on the class.
> Obviously a correction to restore 3.3.2 behavior would represent another 
> breaking change for users that are running on 3.4+, unless they were also 
> somewhat surprisingly configuring buffers for position() == limit() before 
> passing them to send. Arguably, it would also be a breaking change (though 
> possibly not one of great consequence) if either version was changed to 
> correctly handle the wrapped-with-offset case as represented in the original 
> ticket.
> I do not have much experience contending with a situation like this, but at 
> the risk of jumping to a solution here, I wonder if the only way to really 
> move forward safely and unambiguously here is to remove ByteBufferSerializer 
> as it stands and replace it with a differently named substitute that handles 
> both the plain-wrapped special case and just serializes content from 
> position() to limit(), forcing an evaluation by users when upgrading as to 
> whether the provided byte buffer is correctly configured or not. Of course, a 
> change like that would have be released at an appropriate version level, too, 
> so I don't know exactly what the desired interim behavior would be 
> (deprecation?). I believe I would be eager to contribute to a fix, but 
> obviously I would need guidance from maintainers regarding the correct path 
> forward semantically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to