[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775037#comment-17775037
 ] 

Luke Kirby commented on KAFKA-15602:
------------------------------------

{quote}The original documentation mentioned the user "{_}Do not need to 
flip{_}", I assume that means the serializer will automatically rewind the 
position to zero for the user as flip explicitly stated ".{_}..limit is set to 
the current position and then the position is set to zero{_}".  However, the 
serializer API didn't make any assumptions to the current position of the 
buffer, so it is hard to say if the original design intended to handle a 
user-given offset... From reading the unit test, it doesn't seem so.
{quote}
Ah, no there was no documentation on the serializer before; the "Do not need to 
flip" comment was introduced with the breaking change in 3.4, though it's 
pretty plain to see that previous users of the serializer likely _were_ 
flipping, or, at least, writing buffers (with content starting at position 0), 
flipping them, and sending them _worked_ but now doesn't.

Looking at the history for the file, it does look like prior to the 3.4 change 
it always did a rewind() so yeah, position() on the buffer has never really 
been respected. Now, it kind of _does_ respect position; but not in the 
expected way, as it essentially identifies the end of the buffer rather than 
the start.

I don't know why the original version used a rewind() rather than expecting the 
buffer to already have a position() reflecting the start point to read from, 
since in my experience this is clearly the convention for passing ByteBuffer 
inputs for reading in any library I'm familiar with, and I expect the 
expectation of most users; perhaps I'm ignorant of some other pattern! I don't 
know if there's some valid contextual reason for it using explicit rewinds() 
before, but it seems unusual. My suggestion would be to replace the currently 
named ByteBufferSerializer with a new one that respects the standard interface 
unambiguously – i.e., it serializes data from buffer.pos() to buffer.limit(), 
continuing to use appropriate shortcuts for the simple wrap(byte[]) cases.
{quote}I wonder if the sensible thing to do for now is to is to make the 
contract and expectations more explicit on the Javadoc, i.e., the serialize 
method doesn't support user-provided offset, and the serializer will rewind the 
current offset to zero for the user.
{quote}
Explicit documentation would surely be improvement, and is probably at the core 
of how this situation was arrived at, but it doesn't do anything about folk 
being surprised by the completely different behavior and broken output that 
worked from 0.10 - 3.3.2 when upgrading to 3.4.0... though hopefully they 
notice it. 

 

 

> Breaking change in 3.4.0 ByteBufferSerializer
> ---------------------------------------------
>
>                 Key: KAFKA-15602
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15602
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>            Reporter: Luke Kirby
>            Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it knows where to start reading from */ 
> producer.send(bb); {code}
> Technically, you wouldn't even need to use flip() there, since position is 
> ignored; it would sufficient to just call {{{}bb.limit(bb.position()){}}}. 
> Notably, a buffer constructed using any variant of ByteBuffer.wrap() is 
> essentially immediately in read-mode with position indicating the start and 
> limit the end.
> With the change introduced in 3.4.0, however, the contract changes 
> dramatically, and the code just presented produces a 0-byte message. As 
> indicated above, it also continues to fail if you just passed in an 
> offset-specified ByteBuffer.wrap()ped message, too, i.e., the case described 
> by KAFKA-4852:
> {code:java}
> @Test
> public void testByteBufferSerializerOnOffsetWrappedBytes() {
>     final byte[] bytes = "Hello".getBytes(UTF_8);
>     try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
>         assertArrayEquals("ello".getBytes(UTF_8), 
>                 // FAILS: this will yield "H", not "ello"
>                 serializer.serialize(topic, ByteBuffer.wrap(bytes, 1, 
> bytes.length - 1)));
>     }
> }
>  {code}
> What happened here?
> The resulting PR, it seems, focussed on a flawed proposed test case in the 
> first comment of KAFKA-4852 that failed against pre-3.4.0 Kafka. I reproduce 
> that here with commented annotations from me: 
> {code:java}
> @Test // flawed proposed test case
> public void testByteBufferSerializer() {
>     final byte[] bytes = "Hello".getBytes(UTF_8);
>     final ByteBuffer buffer = ByteBuffer.allocate(7);
>     buffer.put(bytes);
>     // buffer.flip();   <-- would make the test work
>     try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
>         assertArrayEquals(bytes, serializer.serialize(topic, buffer));
>     }
> }  {code}
> In pre-3.4.0, this would yield a 7-byte serialization of "Hello" followed by 
> 2 0-value bytes. I contend that this was actually expected and correct 
> behavior, as the ByteBuffer had never had its limit set, so the implicit and 
> mildly expected contract was never actually abided by. If there was a 
> buffer.flip() after the .put(bytes) call, as the calling code _should_ have 
> applied, however, then the test would have succeeded. In short, by trying to 
> make this test case succeed, I think this PR represented nothing more than a 
> misunderstanding for how one should prepare ByteBuffers for reading, but has 
> managed to result in a breaking change. The breaking nature of this was 
> actually briefly noted in PR comments but kind of shrugged off with some test 
> changes and explanatory comments on the class.
> Obviously a correction to restore 3.3.2 behavior would represent another 
> breaking change for users that are running on 3.4+, unless they were also 
> somewhat surprisingly configuring buffers for position() == limit() before 
> passing them to send. Arguably, it would also be a breaking change (though 
> possibly not one of great consequence) if either version was changed to 
> correctly handle the wrapped-with-offset case as represented in the original 
> ticket.
> I do not have much experience contending with a situation like this, but at 
> the risk of jumping to a solution here, I wonder if the only way to really 
> move forward safely and unambiguously here is to remove ByteBufferSerializer 
> as it stands and replace it with a differently named substitute that handles 
> both the plain-wrapped special case and just serializes content from 
> position() to limit(), forcing an evaluation by users when upgrading as to 
> whether the provided byte buffer is correctly configured or not. Of course, a 
> change like that would have be released at an appropriate version level, too, 
> so I don't know exactly what the desired interim behavior would be 
> (deprecation?). I believe I would be eager to contribute to a fix, but 
> obviously I would need guidance from maintainers regarding the correct path 
> forward semantically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to