Myasuka commented on a change in pull request #13885: URL: https://github.com/apache/flink/pull/13885#discussion_r517888118
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FSDataBufferedInputStream.java ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.InputStream; + +/** + * FSDataInputStream with Buffer + */ +public class FSDataBufferedInputStream extends FSDataInputStream { Review comment: I think most of your code is copied from `java.io.BufferedInputStream`, right? Please add notes of this in the javadoc. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FSDataBufferedInputStream.java ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.InputStream; + +/** + * FSDataInputStream with Buffer + */ +public class FSDataBufferedInputStream extends FSDataInputStream { + + private static final int DEFAULT_BUFFER_SIZE = 8192; + + protected volatile byte[] buf; + + // read offset of buf + private int pos; + + // availed count of buf + private int count; + + private final FSDataInputStream inputStream; + + private volatile boolean closed; + + public FSDataBufferedInputStream(FSDataInputStream inputStream) { + this(inputStream, DEFAULT_BUFFER_SIZE); + } + + public FSDataBufferedInputStream( + FSDataInputStream inputStream, + int bufferSize) { + this.inputStream = inputStream; + + Preconditions.checkState(bufferSize > 0, + new IllegalArgumentException("bufferSize must > 0")); Review comment: We should pass error message instead of `IllegalArgumentException` here. ########## File path: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java ########## @@ -157,4 +157,14 @@ .withDescription(String.format("The default size of the write buffer for the checkpoint streams that write to file systems. " + "The actual write buffer size is determined to be the maximum of the value of this option and option '%s'.", FS_SMALL_FILE_THRESHOLD.key())); + /** + * The default size of the read buffer for the checkpoint streams that read from file systems. + */ + @Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS) + public static final ConfigOption<Integer> FS_READ_BUFFER_SIZE = ConfigOptions + .key("state.backend.fs.read-buffer-size") + .intType() Review comment: I am also in favor of `memory type` as the buffer is just an array of bytes allocated from JVM memory. The reason why `state.backend.fs.write-buffer-size` is integer type is that it implemented before Flink introduced `memory type` configuration. ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ########## @@ -877,6 +913,20 @@ public long getWriteBatchSize() { WRITE_BATCH_SIZE.defaultValue().getBytes() : writeBatchSize; } + /** + * Gets the read buffer size for read checkpoint stream. + * + * <p>If not explicitly configured, this is the default value of + * {@link CheckpointingOptions#FS_READ_BUFFER_SIZE}. + * + * @return The file system read buffer size, in bytes. + */ + public int getFsReadBufferSize() { Review comment: As you can see, there existed three duplicate `getFsReadBufferSize` in your PR, which could be an abstract base method for all state backends. If we think it one step further, we now has same interfaces for checkpoint write of state backend, e.g. `createCheckpointStorage`, however, we do not have same interfaces for checkpoint **read** of state backend. That's why you meet such embarrassment here. I think this deserve a discussion over FLINK-19465 and FLIP-142, @sjwiesman what do you think of this? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FSDataBufferedInputStream.java ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.InputStream; + +/** + * FSDataInputStream with Buffer + */ +public class FSDataBufferedInputStream extends FSDataInputStream { + + private static final int DEFAULT_BUFFER_SIZE = 8192; + + protected volatile byte[] buf; Review comment: Unlike `java.io.BufferedInputStream`, `FSDataBufferedInputStream` should only be used by single thread, there is no need to make `buf` as `volatile`. ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ########## @@ -264,12 +270,36 @@ public RocksDBStateBackend(StateBackend checkpointStreamBackend) { * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. */ public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) { + this(checkpointStreamBackend, enableIncrementalCheckpointing, -1); + } + + /** + * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its + * checkpoint data streams. Typically, one would supply a filesystem or database state backend + * here where the snapshots from RocksDB would be stored. + * + * <p>The snapshots of the RocksDB state will be stored using the given backend's + * {@link StateBackend#createCheckpointStorage(JobID)}. + * + * @param checkpointStreamBackend The backend write the checkpoint streams to. + * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. + * @param fsReadBufferSize The read buffer size for read checkpoint stream. + */ + public RocksDBStateBackend( + StateBackend checkpointStreamBackend, + TernaryBoolean enableIncrementalCheckpointing, + int fsReadBufferSize) { Review comment: I don't think we should add a new constructor here. Putting `fsReadBufferSize` as one of the property in `FsStateBackend` is enough. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
