This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 45ba7ee87ca [FLINK-32326][state] Disable WAL in RocksDBWriteBatchWrapper by default. (#22771) 45ba7ee87ca is described below commit 45ba7ee87caee63a0babfd421b7c5eabaa779baa Author: Stefan Richter <srich...@apache.org> AuthorDate: Wed Jun 14 19:23:48 2023 +0200 [FLINK-32326][state] Disable WAL in RocksDBWriteBatchWrapper by default. (#22771) * [FLINK-32326][state] Disable WAL in RocksDBWriteBatchWrapper by default. Disables WAL by default in RocksDBWriteBatchWrapper for the case that now explicit WriteOption is passed in. This is the case in all restore operations and can impact the performance. * [hotfix][state] Replace deprecated API call WriteBatch::remove with WriteBatch::delete. --- .../streaming/state/RocksDBWriteBatchWrapper.java | 40 ++++++++++++++-------- .../state/RocksDBWriteBatchWrapperTest.java | 37 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 14 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java index 3906c74972c..354009e335c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java @@ -32,6 +32,9 @@ import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + /** * It's a wrapper class around RocksDB's {@link WriteBatch} for writing in bulk. * @@ -55,6 +58,9 @@ public class RocksDBWriteBatchWrapper implements AutoCloseable { @Nonnegative private final long batchSize; + /** List of all objects that we need to close in close(). */ + private final List<AutoCloseable> toClose; + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, long writeBatchSize) { this(rocksDB, null, 500, writeBatchSize); } @@ -79,9 +85,9 @@ public class RocksDBWriteBatchWrapper implements AutoCloseable { Preconditions.checkArgument(batchSize >= 0, "Max batch size have to be no negative."); this.db = rocksDB; - this.options = options; this.capacity = capacity; this.batchSize = batchSize; + this.toClose = new ArrayList<>(2); if (this.batchSize > 0) { this.batch = new WriteBatch( @@ -89,6 +95,15 @@ public class RocksDBWriteBatchWrapper implements AutoCloseable { } else { this.batch = new WriteBatch(this.capacity * PER_RECORD_BYTES); } + this.toClose.add(this.batch); + if (options != null) { + this.options = options; + } else { + // Use default write options with disabled WAL + this.options = new WriteOptions().setDisableWAL(true); + // We own this object, so we must ensure that we close it. + this.toClose.add(this.options); + } } public void put(@Nonnull ColumnFamilyHandle handle, @Nonnull byte[] key, @Nonnull byte[] value) @@ -102,33 +117,30 @@ public class RocksDBWriteBatchWrapper implements AutoCloseable { public void remove(@Nonnull ColumnFamilyHandle handle, @Nonnull byte[] key) throws RocksDBException { - batch.remove(handle, key); + batch.delete(handle, key); flushIfNeeded(); } public void flush() throws RocksDBException { - if (options != null) { - db.write(options, batch); - } else { - // use the default WriteOptions, if wasn't provided. - try (WriteOptions writeOptions = new WriteOptions()) { - db.write(writeOptions, batch); - } - } + db.write(options, batch); batch.clear(); } - public WriteOptions getOptions() { + @VisibleForTesting + WriteOptions getOptions() { return options; } @Override public void close() throws RocksDBException { - if (batch.count() != 0) { - flush(); + try { + if (batch.count() != 0) { + flush(); + } + } finally { + IOUtils.closeAllQuietly(toClose); } - IOUtils.closeQuietly(batch); } private void flushIfNeeded() throws RocksDBException { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java index c7b725fe73c..6a7c95af067 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java @@ -35,6 +35,8 @@ import java.util.concurrent.ThreadLocalRandom; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** Tests to guard {@link RocksDBWriteBatchWrapper}. */ public class RocksDBWriteBatchWrapperTest { @@ -123,4 +125,39 @@ public class RocksDBWriteBatchWrapperTest { assertEquals(initBatchSize, writeBatchWrapper.getDataSize()); } } + + /** + * Test that {@link RocksDBWriteBatchWrapper} creates default {@link WriteOptions} with disabled + * WAL and closes them correctly. + */ + @Test + public void testDefaultWriteOptionsHaveDisabledWAL() throws Exception { + WriteOptions options; + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + RocksDBWriteBatchWrapper writeBatchWrapper = + new RocksDBWriteBatchWrapper(db, null, 200, 50)) { + options = writeBatchWrapper.getOptions(); + assertTrue(options.isOwningHandle()); + assertTrue(options.disableWAL()); + } + assertFalse(options.isOwningHandle()); + } + + /** + * Test that {@link RocksDBWriteBatchWrapper} respects passed in {@link WriteOptions} and does + * not close them. + */ + @Test + public void testNotClosingPassedInWriteOption() throws Exception { + try (WriteOptions passInOption = new WriteOptions().setDisableWAL(false)) { + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + RocksDBWriteBatchWrapper writeBatchWrapper = + new RocksDBWriteBatchWrapper(db, passInOption, 200, 50)) { + WriteOptions options = writeBatchWrapper.getOptions(); + assertTrue(options.isOwningHandle()); + assertFalse(options.disableWAL()); + } + assertTrue(passInOption.isOwningHandle()); + } + } }