This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c334cbf24960978461bb3c64eead9907ea10fc99 Author: David Moravek <d...@apache.org> AuthorDate: Fri Jan 12 18:11:15 2024 +0100 [FLINK-34063][runtime] Fix CompressibleFSDataInputStream#seek in case of partial reads. --- .../state/CompressibleFSDataInputStream.java | 6 + .../state/CompressibleFSDataOutputStream.java | 2 +- .../state/CompressibleFSDataInputStreamTest.java | 142 +++++++++++++++++++++ 3 files changed, 149 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataInputStream.java index b38554eb0e3..dc95b0dc379 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataInputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataInputStream.java @@ -41,6 +41,12 @@ public class CompressibleFSDataInputStream extends FSDataInputStream { @Override public void seek(long desired) throws IOException { + final int available = compressingDelegate.available(); + if (available > 0) { + if (available != compressingDelegate.skip(available)) { + throw new IOException("Unable to skip buffered data."); + } + } delegate.seek(desired); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataOutputStream.java index 9c3628d1223..b0339bed46b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataOutputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataOutputStream.java @@ -33,7 +33,7 @@ public class CompressibleFSDataOutputStream extends FSDataOutputStream { private final OutputStream compressingDelegate; public CompressibleFSDataOutputStream( - CheckpointStateOutputStream delegate, StreamCompressionDecorator compressionDecorator) + FSDataOutputStream delegate, StreamCompressionDecorator compressionDecorator) throws IOException { this.delegate = delegate; this.compressingDelegate = compressionDecorator.decorateWithCompression(delegate); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CompressibleFSDataInputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CompressibleFSDataInputStreamTest.java new file mode 100644 index 00000000000..085afa69bff --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CompressibleFSDataInputStreamTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.io.InputStreamFSInputWrapper; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link CompressibleFSDataInputStream}. */ +class CompressibleFSDataInputStreamTest { + + private static class TestingOutputStream extends FSDataOutputStream { + + private final ByteArrayOutputStreamWithPos delegate = new ByteArrayOutputStreamWithPos(); + + @Override + public long getPos() { + return delegate.getPosition(); + } + + @Override + public void flush() throws IOException { + delegate.flush(); + } + + @Override + public void sync() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public void write(int b) { + delegate.write(b); + } + + byte[] toByteArray() { + return delegate.toByteArray(); + } + } + + private static void verifyRecord( + FSDataInputStream inputStream, Map<String, Long> positions, String record) + throws IOException { + inputStream.seek(Objects.requireNonNull(positions.get(record))); + final byte[] readBuffer = new byte[record.getBytes(StandardCharsets.UTF_8).length]; + for (int i = 0; i < readBuffer.length; ++i) { + readBuffer[i] = (byte) inputStream.read(); + } + assertThat(readBuffer).asString(StandardCharsets.UTF_8).isEqualTo(record); + } + + private static void verifyRecordPrefix( + FSDataInputStream inputStream, + Map<String, Long> positions, + String record, + String prefix) + throws IOException { + assertThat(record).startsWith(prefix); + inputStream.seek(Objects.requireNonNull(positions.get(record))); + final byte[] readBuffer = new byte[prefix.getBytes(StandardCharsets.UTF_8).length]; + for (int i = 0; i < readBuffer.length; ++i) { + readBuffer[i] = (byte) inputStream.read(); + } + assertThat(readBuffer).asString(StandardCharsets.UTF_8).isEqualTo(prefix); + } + + @Test + void testSeek() throws IOException { + final List<String> records = Arrays.asList("first", "second", "third", "fourth", "fifth"); + final Map<String, Long> positions = new HashMap<>(); + + byte[] compressedBytes; + try (final TestingOutputStream outputStream = new TestingOutputStream(); + final CompressibleFSDataOutputStream compressibleOutputStream = + new CompressibleFSDataOutputStream( + outputStream, new SnappyStreamCompressionDecorator())) { + for (String record : records) { + positions.put(record, compressibleOutputStream.getPos()); + compressibleOutputStream.write(record.getBytes(StandardCharsets.UTF_8)); + } + compressibleOutputStream.flush(); + compressedBytes = outputStream.toByteArray(); + } + + try (final FSDataInputStream inputStream = + new InputStreamFSInputWrapper(new ByteArrayInputStream(compressedBytes)); + final FSDataInputStream compressibleInputStream = + new CompressibleFSDataInputStream( + inputStream, new SnappyStreamCompressionDecorator())) { + verifyRecord(compressibleInputStream, positions, "first"); + verifyRecord(compressibleInputStream, positions, "third"); + verifyRecord(compressibleInputStream, positions, "fifth"); + } + + // Verify read of partial records. This ensures that we skip any unread data in the + // underlying buffers. + try (final FSDataInputStream inputStream = + new InputStreamFSInputWrapper(new ByteArrayInputStream(compressedBytes)); + final FSDataInputStream compressibleInputStream = + new CompressibleFSDataInputStream( + inputStream, new SnappyStreamCompressionDecorator())) { + verifyRecordPrefix(compressibleInputStream, positions, "first", "fir"); + verifyRecordPrefix(compressibleInputStream, positions, "third", "thi"); + verifyRecord(compressibleInputStream, positions, "fifth"); + } + } +}