Repository: flink Updated Branches: refs/heads/master abc3e1c88 -> 7d026aa72
[FLINK-7757] [checkpointing] Introduce resource guard for RocksDBKeyedStateBackend to reduce locking and avoid blocking behavior. This closes #4764. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d026aa7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d026aa7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d026aa7 Branch: refs/heads/master Commit: 7d026aa72852f32c0ca29924c9e2b565de41b710 Parents: abc3e1c Author: Stefan Richter <[email protected]> Authored: Tue Oct 3 15:37:58 2017 +0200 Committer: Stefan Richter <[email protected]> Committed: Wed Oct 18 17:12:20 2017 +0800 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer011.java | 2 +- .../kafka/FlinkKafkaProducerBase.java | 2 +- .../kafka/testutils/IntegerSource.java | 2 +- .../state/RocksDBKeyedStateBackend.java | 181 ++++++++----------- .../state/RocksDBStateBackendTest.java | 54 +----- .../org/apache/flink/util/ResourceGuard.java | 152 ++++++++++++++++ .../apache/flink/util/SerializableObject.java | 28 +++ .../apache/flink/util/ResourceGuardTest.java | 133 ++++++++++++++ .../state/DefaultOperatorStateBackend.java | 16 +- .../flink/runtime/util/SerializableObject.java | 28 --- .../checkpoint/CheckpointStateRestoreTest.java | 2 +- .../runtime/state/StateBackendTestBase.java | 71 +++++++- .../api/functions/sink/SocketClientSink.java | 2 +- 13 files changed, 478 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 67e237d..8b1a62d 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -28,7 +28,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; @@ -42,6 +41,7 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializableObject; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index befc1a1..cf07a23 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -33,6 +32,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegat import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.SerializableObject; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java index ef50766..25a3cea 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java @@ -19,9 +19,9 @@ package org.apache.flink.streaming.connectors.kafka.testutils; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.util.SerializableObject; import java.util.Collections; import java.util.List; http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 8236e5b..f67daab 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -72,11 +72,11 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; -import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; import org.apache.flink.util.StateMigrationException; import org.rocksdb.Checkpoint; @@ -158,11 +158,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final File instanceRocksDBPath; /** - * Lock for protecting cleanup of the RocksDB against the checkpointing thread. We acquire this when doing - * asynchronous checkpoints and when disposing the DB. Otherwise, the asynchronous snapshot might try - * iterating over a disposed DB. After aquriring the lock, always first check if (db == null). + * Protects access to RocksDB in other threads, like the checkpointing thread from parallel call that dispose the + * RocksDb object. */ - private final SerializableObject asyncSnapshotLock = new SerializableObject(); + private final ResourceGuard rocksDBResourceGuard; /** * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState} @@ -225,6 +224,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; + this.rocksDBResourceGuard = new ResourceGuard(); // ensure that we use the right merge operator, because other code relies on this this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions) @@ -281,30 +281,29 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { public void dispose() { super.dispose(); - // Acquire the lock, so that no ongoing snapshots access the db during cleanup - synchronized (asyncSnapshotLock) { - // IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as - // working on the disposed object results in SEGFAULTS. Other code has to check field #db for null - // and access it in a synchronized block that locks on #dbDisposeLock. - if (db != null) { - - // RocksDB's native memory management requires that *all* CFs (including default) are closed before the - // DB is closed. So we start with the ones created by Flink... - for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData : - kvStateInformation.values()) { - IOUtils.closeQuietly(columnMetaData.f0); - } + // This call will block until all clients that still acquired access to the RocksDB instance have released it, + // so that we cannot release the native resources while clients are still working with it in parallel. + rocksDBResourceGuard.close(); + + // IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as + // working on the disposed object results in SEGFAULTS. + if (db != null) { - // ... close the default CF ... - IOUtils.closeQuietly(defaultColumnFamily); + // RocksDB's native memory management requires that *all* CFs (including default) are closed before the + // DB is closed. So we start with the ones created by Flink... + for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData : + kvStateInformation.values()) { + IOUtils.closeQuietly(columnMetaData.f0); + } - // ... and finally close the DB instance ... - IOUtils.closeQuietly(db); + // ... close the default CF ... + IOUtils.closeQuietly(defaultColumnFamily); - // invalidate the reference before releasing the lock so that other accesses will not cause crashes - db = null; + // ... and finally close the DB instance ... + IOUtils.closeQuietly(db); - } + // invalidate the reference before releasing the lock so that other accesses will not cause crashes + db = null; } kvStateInformation.clear(); @@ -356,6 +355,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final long checkpointTimestamp, final CheckpointStreamFactory checkpointStreamFactory) throws Exception { + if (db == null) { + throw new IOException("RocksDB closed."); + } + + if (kvStateInformation.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + + checkpointTimestamp + " . Returning null."); + } + return DoneFuture.nullValue(); + } + final RocksDBIncrementalSnapshotOperation<K> snapshotOperation = new RocksDBIncrementalSnapshotOperation<>( this, @@ -363,21 +374,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { checkpointId, checkpointTimestamp); - synchronized (asyncSnapshotLock) { - if (db == null) { - throw new IOException("RocksDB closed."); - } - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + - checkpointTimestamp + " . Returning null."); - } - return DoneFuture.nullValue(); - } - - snapshotOperation.takeSnapshot(); - } + snapshotOperation.takeSnapshot(); return new FutureTask<KeyedStateHandle>( new Callable<KeyedStateHandle>() { @@ -410,28 +407,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final RocksDBFullSnapshotOperation<K> snapshotOperation; - // hold the db lock while operation on the db to guard us against async db disposal - synchronized (asyncSnapshotLock) { - - if (db != null) { - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + - " . Returning null."); - } - return DoneFuture.nullValue(); - } - - snapshotOperation = - new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry); - - snapshotOperation.takeDBSnapShot(checkpointId, timestamp); - } else { - throw new IOException("RocksDB closed."); + if (kvStateInformation.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + + " . Returning null."); } + + return DoneFuture.nullValue(); } + snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry); + snapshotOperation.takeDBSnapShot(checkpointId, timestamp); + // implementation of the async IO operation, based on FutureTask AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable = new AbstractAsyncCallableWithResources<KeyedStateHandle>() { @@ -450,9 +437,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private void releaseSnapshotOperationResources() { // hold the db lock while operation on the db to guard us against async db disposal - synchronized (asyncSnapshotLock) { - snapshotOperation.releaseSnapshotResources(); - } + snapshotOperation.releaseSnapshotResources(); } @Override @@ -474,16 +459,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { public KeyGroupsStateHandle performOperation() throws Exception { long startTime = System.currentTimeMillis(); - synchronized (asyncSnapshotLock) { - // hold the db lock while operation on the db to guard us against async db disposal - if (db == null) { - throw new IOException("RocksDB closed."); - } - - snapshotOperation.writeDBSnapshot(); - snapshotOperation.createSnapshotResultStateHandleFromOutputStream(); + if (isStopped()) { + throw new IOException("RocksDB closed."); } + snapshotOperation.writeDBSnapshot(); + LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); @@ -509,6 +490,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final KeyGroupRangeOffsets keyGroupRangeOffsets; private final CheckpointStreamFactory checkpointStreamFactory; private final CloseableRegistry snapshotCloseableRegistry; + private final ResourceGuard.Lease dbLease; private long checkpointId; private long checkpointTimeStamp; @@ -519,17 +501,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private CheckpointStreamFactory.CheckpointStateOutputStream outStream; private DataOutputView outputView; - private KeyGroupsStateHandle snapshotResultStateHandle; RocksDBFullSnapshotOperation( RocksDBKeyedStateBackend<K> stateBackend, CheckpointStreamFactory checkpointStreamFactory, - CloseableRegistry registry) { + CloseableRegistry registry) throws IOException { this.stateBackend = stateBackend; this.checkpointStreamFactory = checkpointStreamFactory; this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange); this.snapshotCloseableRegistry = registry; + this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); } /** @@ -575,11 +557,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } /** - * 4) Close the CheckpointStateOutputStream after writing and receive a state handle. + * 4) Returns a state handle to the snapshot after the snapshot procedure is completed and null before. * - * @throws IOException + * @return state handle to the completed snapshot */ - public void createSnapshotResultStateHandleFromOutputStream() throws IOException { + public KeyGroupsStateHandle getSnapshotResultStateHandle() throws IOException { if (snapshotCloseableRegistry.unregisterCloseable(outStream)) { @@ -587,9 +569,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { outStream = null; if (stateHandle != null) { - this.snapshotResultStateHandle = new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); } } + return null; } /** @@ -618,38 +601,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { IOUtils.closeQuietly(readOptions); readOptions = null; } - } - - /** - * Drop the created snapshot if we have ben cancelled. - */ - public void dropSnapshotResult() { - if (null != snapshotResultStateHandle) { - try { - snapshotResultStateHandle.discardState(); - } catch (Exception e) { - LOG.warn("Exception occurred during snapshot state handle cleanup.", e); - } - } - } - - /** - * Returns the current CheckpointStateOutputStream (when it was opened and not yet closed) into which we write - * the state snapshot. - * - * @return the current CheckpointStateOutputStream - */ - public CheckpointStreamFactory.CheckpointStateOutputStream getOutStream() { - return outStream; - } - /** - * Returns a state handle to the snapshot after the snapshot procedure is completed and null before. - * - * @return state handle to the completed snapshot - */ - public KeyGroupsStateHandle getSnapshotResultStateHandle() { - return snapshotResultStateHandle; + this.dbLease.close(); } private void writeKVStateMetaData() throws IOException { @@ -827,18 +780,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // handles to the misc files in the current snapshot private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); + // This lease protects from concurrent disposal of the native rocksdb instance. + private final ResourceGuard.Lease dbLease; + private StreamStateHandle metaStateHandle = null; private RocksDBIncrementalSnapshotOperation( RocksDBKeyedStateBackend<K> stateBackend, CheckpointStreamFactory checkpointStreamFactory, long checkpointId, - long checkpointTimestamp) { + long checkpointTimestamp) throws IOException { this.stateBackend = stateBackend; this.checkpointStreamFactory = checkpointStreamFactory; this.checkpointId = checkpointId; this.checkpointTimestamp = checkpointTimestamp; + this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); } private StreamStateHandle materializeStateData(Path filePath) throws Exception { @@ -919,7 +876,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } void takeSnapshot() throws Exception { - assert (Thread.holdsLock(stateBackend.asyncSnapshotLock)); final long lastCompletedCheckpoint; @@ -940,6 +896,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // save state data backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId); + + LOG.trace("Local RocksDB checkpoint goes to backup path {}.", backupPath); + backupFileSystem = backupPath.getFileSystem(); if (backupFileSystem.exists(backupPath)) { throw new IllegalStateException("Unexpected existence of the backup directory."); @@ -1013,6 +972,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { void releaseResources(boolean canceled) { + dbLease.close(); + if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { try { closeableRegistry.close(); @@ -1024,6 +985,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (backupPath != null) { try { if (backupFileSystem.exists(backupPath)) { + + LOG.trace("Deleting local RocksDB backup path {}.", backupPath); backupFileSystem.delete(backupPath, true); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 08d661c..a2ec052 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -215,46 +215,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa } @Test - public void testRunningSnapshotAfterBackendClosed() throws Exception { - setupRocksKeyedStateBackend(); - RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, - CheckpointOptions.forFullCheckpoint()); - - RocksDB spyDB = keyedStateBackend.db; - - if (!enableIncrementalCheckpointing) { - verify(spyDB, times(1)).getSnapshot(); - verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class)); - } - - this.keyedStateBackend.dispose(); - verify(spyDB, times(1)).close(); - assertEquals(null, keyedStateBackend.db); - - //Ensure every RocksObjects not closed yet - for (RocksObject rocksCloseable : allCreatedCloseables) { - verify(rocksCloseable, times(0)).close(); - } - - Thread asyncSnapshotThread = new Thread(snapshot); - asyncSnapshotThread.start(); - try { - snapshot.get(); - fail(); - } catch (Exception ignored) { - - } - - asyncSnapshotThread.join(); - - //Ensure every RocksObject was closed exactly once - for (RocksObject rocksCloseable : allCreatedCloseables) { - verify(rocksCloseable, times(1)).close(); - } - - } - - @Test public void testCorrectMergeOperatorSet() throws IOException { final ColumnFamilyOptions columnFamilyOptions = spy(new ColumnFamilyOptions()); @@ -289,8 +249,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa setupRocksKeyedStateBackend(); try { - RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, - CheckpointOptions.forFullCheckpoint()); + RunnableFuture<KeyedStateHandle> snapshot = + keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); RocksDB spyDB = keyedStateBackend.db; @@ -299,10 +259,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class)); } - this.keyedStateBackend.dispose(); - verify(spyDB, times(1)).close(); - assertEquals(null, keyedStateBackend.db); - //Ensure every RocksObjects not closed yet for (RocksObject rocksCloseable : allCreatedCloseables) { verify(rocksCloseable, times(0)).close(); @@ -310,10 +266,16 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa snapshot.cancel(true); + this.keyedStateBackend.dispose(); + + verify(spyDB, times(1)).close(); + assertEquals(null, keyedStateBackend.db); + //Ensure every RocksObjects was closed exactly once for (RocksObject rocksCloseable : allCreatedCloseables) { verify(rocksCloseable, times(1)).close(); } + } finally { keyedStateBackend.dispose(); keyedStateBackend = null; http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java b/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java new file mode 100644 index 0000000..83660a6 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class is a guard for shared resources with the following invariants. The resource can be acquired by multiple + * clients in parallel through the {@link #acquireResource()} call. As a result of the call, each client gets a + * {@link Lease}. The {@link #close()} method of the lease releases the resources and reduces the client count in + * the {@link ResourceGuard} object. + * The protected resource should only be disposed once the corresponding resource guard is successfully closed, but + * the guard can only be closed once all clients that acquired a lease for the guarded resource have released it. + * Before this is happened, the call to {@link #close()} will block until the zero-open-leases condition is triggered. + * After the resource guard is closed, calls to {@link #acquireResource()} will fail with exception. Notice that, + * obviously clients are responsible to release the resource after usage. All clients are considered equal, i.e. there + * is only a global count maintained how many times the resource was acquired but not by whom. + */ +public class ResourceGuard implements AutoCloseable, Serializable { + + private static final long serialVersionUID = 1L; + + /** + * The object that serves as lock for count and the closed-flag. + */ + private final SerializableObject lock; + + /** + * Number of clients that have ongoing access to the resource. + */ + private volatile int leaseCount; + + /** + * This flag indicated if it is still possible to acquire access to the resource. + */ + private volatile boolean closed; + + public ResourceGuard() { + this.lock = new SerializableObject(); + this.leaseCount = 0; + this.closed = false; + } + + /** + * Acquired access from one new client for the guarded resource. + * + * @throws IOException when the resource guard is already closed. + */ + public Lease acquireResource() throws IOException { + + synchronized (lock) { + + if (closed) { + throw new IOException("Resource guard was already closed."); + } + + ++leaseCount; + } + + return new Lease(); + } + + /** + * Releases access for one client of the guarded resource. This method must only be called after a matching call to + * {@link #acquireResource()}. + */ + private void releaseResource() { + + synchronized (lock) { + + --leaseCount; + + if (closed && leaseCount == 0) { + lock.notifyAll(); + } + } + } + + /** + * Closed the resource guard. This method will block until all calls to {@link #acquireResource()} have seen their + * matching call to {@link #releaseResource()}. + */ + @Override + public void close() { + + synchronized (lock) { + + closed = true; + + while (leaseCount > 0) { + + try { + lock.wait(); + } catch (InterruptedException ignore) { + // Even on interruption, we cannot terminate the loop until all open leases are closed. + } + } + } + } + + /** + * Returns true if the resource guard is closed, i.e. after {@link #close()} was called. + */ + public boolean isClosed() { + return closed; + } + + /** + * Returns the current count of open leases. + */ + public int getLeaseCount() { + return leaseCount; + } + + /** + * A lease is issued by the {@link ResourceGuard} as result of calls to {@link #acquireResource()}. The owner of the + * lease can release it via the {@link #close()} call. + */ + public class Lease implements AutoCloseable { + + private final AtomicBoolean closed; + + private Lease() { + this.closed = new AtomicBoolean(false); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + releaseResource(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-core/src/main/java/org/apache/flink/util/SerializableObject.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializableObject.java b/flink-core/src/main/java/org/apache/flink/util/SerializableObject.java new file mode 100644 index 0000000..ec01731 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/SerializableObject.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +/** + * A simple object that only implements {@link java.io.Serializable}, so it can be used + * in serializable classes. + */ +public class SerializableObject implements java.io.Serializable { + + private static final long serialVersionUID = -7322636177391854669L; +} http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java b/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java new file mode 100644 index 0000000..a030a81 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ResourceGuardTest { + + @Test + public void testClose() { + ResourceGuard resourceGuard = new ResourceGuard(); + Assert.assertFalse(resourceGuard.isClosed()); + resourceGuard.close(); + Assert.assertTrue(resourceGuard.isClosed()); + try { + resourceGuard.acquireResource(); + Assert.fail(); + } catch (IOException ignore) { + } + } + + @Test + public void testAcquireReleaseClose() throws IOException { + ResourceGuard resourceGuard = new ResourceGuard(); + ResourceGuard.Lease lease = resourceGuard.acquireResource(); + Assert.assertEquals(1, resourceGuard.getLeaseCount()); + lease.close(); + Assert.assertEquals(0, resourceGuard.getLeaseCount()); + resourceGuard.close(); + Assert.assertTrue(resourceGuard.isClosed()); + } + + @Test + public void testCloseBlockIfAcquired() throws Exception { + ResourceGuard resourceGuard = new ResourceGuard(); + ResourceGuard.Lease lease_1 = resourceGuard.acquireResource(); + AtomicBoolean checker = new AtomicBoolean(true); + + Thread closerThread = new Thread() { + @Override + public void run() { + try { + // this line should block until all acquires are matched by releases. + resourceGuard.close(); + checker.set(false); + } catch (Exception ignore) { + checker.set(false); + } + } + }; + + closerThread.start(); + + ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); + lease_2.close(); + Assert.assertTrue(checker.get()); + + // this matches the first acquire and will unblock the close. + lease_1.close(); + closerThread.join(60_000); + Assert.assertFalse(checker.get()); + } + + @Test + public void testInterruptHandledCorrectly() throws Exception { + ResourceGuard resourceGuard = new ResourceGuard(); + ResourceGuard.Lease lease = resourceGuard.acquireResource(); + AtomicBoolean checker = new AtomicBoolean(true); + + Thread closerThread = new Thread() { + @Override + public void run() { + try { + // this line should block until all acquires are matched by releases. + resourceGuard.close(); + checker.set(false); + } catch (Exception ignore) { + checker.set(false); + } + } + }; + + closerThread.start(); + closerThread.interrupt(); + + Assert.assertTrue(checker.get()); + + lease.close(); + closerThread.join(60_000); + Assert.assertFalse(checker.get()); + } + + @Test + public void testLeaseCloseIsIdempotent() throws Exception { + ResourceGuard resourceGuard = new ResourceGuard(); + ResourceGuard.Lease lease_1 = resourceGuard.acquireResource(); + ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); + Assert.assertEquals(2, resourceGuard.getLeaseCount()); + lease_1.close(); + Assert.assertEquals(1, resourceGuard.getLeaseCount()); + lease_1.close(); + Assert.assertEquals(1, resourceGuard.getLeaseCount()); + lease_2.close(); + Assert.assertEquals(0, resourceGuard.getLeaseCount()); + ResourceGuard.Lease lease_3 = resourceGuard.acquireResource(); + Assert.assertEquals(1, resourceGuard.getLeaseCount()); + lease_2.close(); + Assert.assertEquals(1, resourceGuard.getLeaseCount()); + lease_3.close(); + Assert.assertEquals(0, resourceGuard.getLeaseCount()); + resourceGuard.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 1fb03d7..9edf8fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -233,19 +233,25 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { @Override protected void acquireResources() throws Exception { - out = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); - closeStreamOnCancelRegistry.registerCloseable(out); + openOutStream(); } @Override protected void releaseResources() throws Exception { - if (closeStreamOnCancelRegistry.unregisterCloseable(out)) { - IOUtils.closeQuietly(out); - } + closeOutStream(); } @Override protected void stopOperation() throws Exception { + closeOutStream(); + } + + private void openOutStream() throws Exception { + out = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + closeStreamOnCancelRegistry.registerCloseable(out); + } + + private void closeOutStream() { if (closeStreamOnCancelRegistry.unregisterCloseable(out)) { IOUtils.closeQuietly(out); } http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java deleted file mode 100644 index af6fa16..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -/** - * A simple object that only implements {@link java.io.Serializable}, so it can be used - * in serializable classes. - */ -public class SerializableObject implements java.io.Serializable { - - private static final long serialVersionUID = -7322636177391854669L; -} http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 1788434..47daa01 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.util.SerializableObject; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index dbf131f..50b5e26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -66,14 +66,13 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; +import org.apache.flink.shaded.guava18.com.google.common.base.Joiner; import org.apache.flink.types.IntValue; import org.apache.flink.util.FutureUtil; import org.apache.flink.util.IOUtils; import org.apache.flink.util.StateMigrationException; import org.apache.flink.util.TestLogger; -import org.apache.flink.shaded.guava18.com.google.common.base.Joiner; - import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; @@ -2914,6 +2913,74 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } } + /** + * The purpose of this test is to check that parallel snapshots are possible, and work even if a previous snapshot + * is still running and blocking. + */ + @Test + public void testParallelAsyncSnapshots() throws Exception { + OneShotLatch blocker = new OneShotLatch(); + OneShotLatch waiter = new OneShotLatch(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + streamFactory.setWaiterLatch(waiter); + streamFactory.setBlockerLatch(blocker); + streamFactory.setAfterNumberInvocations(10); + + final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + try { + + if (!backend.supportsAsynchronousSnapshots()) { + return; + } + + // insert some data to the backend. + InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); + + valueState.setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 10; ++i) { + backend.setCurrentKey(i); + valueState.update(i); + } + + RunnableFuture<KeyedStateHandle> snapshot1 = + backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint()); + + Thread runner1 = new Thread(snapshot1, "snapshot-1-runner"); + runner1.start(); + // after this call returns, we have a running snapshot-1 that is blocked in IO. + waiter.await(); + + // do some updates in between the snapshots. + for (int i = 5; i < 15; ++i) { + backend.setCurrentKey(i); + valueState.update(i + 1); + } + + // we don't want to block the second snapshot. + streamFactory.setWaiterLatch(null); + streamFactory.setBlockerLatch(null); + + RunnableFuture<KeyedStateHandle> snapshot2 = + backend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forFullCheckpoint()); + + Thread runner2 = new Thread(snapshot2,"snapshot-2-runner"); + runner2.start(); + // snapshot-2 should run and succeed, while snapshot-1 is still running and blocked in IO. + snapshot2.get(); + + // we release the blocking IO so that snapshot-1 can also finish and succeed. + blocker.trigger(); + snapshot1.get(); + + } finally { + backend.dispose(); + } + } + @Test public void testAsyncSnapshot() throws Exception { OneShotLatch waiter = new OneShotLatch(); http://git-wip-us.apache.org/repos/asf/flink/blob/7d026aa7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java index c43345b..214d5c2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.SerializableObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
