Repository: flink
Updated Branches:
  refs/heads/master 63c04a516 -> 38003c282


[FLINK-6475] [checkpoint] Incremental snapshots in RocksDB should not hold lock 
during async file upload


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38003c28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38003c28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38003c28

Branch: refs/heads/master
Commit: 38003c2829879f65e26914c0fedb102052fe201f
Parents: b8ffacb
Author: Stefan Richter <[email protected]>
Authored: Sun May 7 20:22:34 2017 +0200
Committer: Stefan Richter <[email protected]>
Committed: Sun May 7 22:00:43 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 63 +++++++++-----------
 .../flink/util/AbstractCloseableRegistry.java   |  4 ++
 .../state/AbstractKeyedStateBackend.java        |  4 ++
 .../state/DefaultOperatorStateBackend.java      |  1 +
 .../api/operators/AbstractStreamOperator.java   |  3 -
 5 files changed, 37 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/38003c28/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 3cb21ac..f5dddd6 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -844,54 +844,47 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                KeyedStateHandle materializeSnapshot() throws Exception {
 
-                       synchronized (stateBackend.asyncSnapshotLock) {
+                       
stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry);
 
-                               if (stateBackend.db == null) {
-                                       throw new IOException("RocksDB 
closed.");
-                               }
-
-                               
stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry);
+                       // write meta data
+                       metaStateHandle = materializeMetaData();
 
-                               // write meta data
-                               metaStateHandle = materializeMetaData();
+                       // write state data
+                       
Preconditions.checkState(backupFileSystem.exists(backupPath));
 
-                               // write state data
-                               
Preconditions.checkState(backupFileSystem.exists(backupPath));
+                       FileStatus[] fileStatuses = 
backupFileSystem.listStatus(backupPath);
+                       if (fileStatuses != null) {
+                               for (FileStatus fileStatus : fileStatuses) {
+                                       Path filePath = fileStatus.getPath();
+                                       String fileName = filePath.getName();
 
-                               FileStatus[] fileStatuses = 
backupFileSystem.listStatus(backupPath);
-                               if (fileStatuses != null) {
-                                       for (FileStatus fileStatus : 
fileStatuses) {
-                                               Path filePath = 
fileStatus.getPath();
-                                               String fileName = 
filePath.getName();
+                                       if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
+                                               StreamStateHandle fileHandle =
+                                                       baseSstFiles == null ? 
null : baseSstFiles.get(fileName);
 
-                                               if 
(fileName.endsWith(SST_FILE_SUFFIX)) {
-                                                       StreamStateHandle 
fileHandle =
-                                                               baseSstFiles == 
null ? null : baseSstFiles.get(fileName);
+                                               if (fileHandle == null) {
+                                                       fileHandle = 
materializeStateData(filePath);
 
-                                                       if (fileHandle == null) 
{
-                                                               fileHandle = 
materializeStateData(filePath);
-
-                                                               
newSstFiles.put(fileName, fileHandle);
-                                                       } else {
-                                                               
oldSstFiles.put(fileName, fileHandle);
-                                                       }
+                                                       
newSstFiles.put(fileName, fileHandle);
                                                } else {
-                                                       StreamStateHandle 
fileHandle = materializeStateData(filePath);
-                                                       miscFiles.put(fileName, 
fileHandle);
+                                                       
oldSstFiles.put(fileName, fileHandle);
                                                }
+                                       } else {
+                                               StreamStateHandle fileHandle = 
materializeStateData(filePath);
+                                               miscFiles.put(fileName, 
fileHandle);
                                        }
                                }
+                       }
 
-                               Map<String, StreamStateHandle> sstFiles = new 
HashMap<>(newSstFiles.size() + oldSstFiles.size());
-                               sstFiles.putAll(newSstFiles);
-                               sstFiles.putAll(oldSstFiles);
+                       Map<String, StreamStateHandle> sstFiles = new 
HashMap<>(newSstFiles.size() + oldSstFiles.size());
+                       sstFiles.putAll(newSstFiles);
+                       sstFiles.putAll(oldSstFiles);
 
-                               
stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+                       stateBackend.materializedSstFiles.put(checkpointId, 
sstFiles);
 
-                               return new 
RocksDBIncrementalKeyedStateHandle(stateBackend.jobId,
-                                       stateBackend.operatorIdentifier, 
stateBackend.keyGroupRange,
-                                       checkpointId, newSstFiles, oldSstFiles, 
miscFiles, metaStateHandle);
-                       }
+                       return new 
RocksDBIncrementalKeyedStateHandle(stateBackend.jobId,
+                               stateBackend.operatorIdentifier, 
stateBackend.keyGroupRange,
+                               checkpointId, newSstFiles, oldSstFiles, 
miscFiles, metaStateHandle);
                }
 
                void stop() {

http://git-wip-us.apache.org/repos/asf/flink/blob/38003c28/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index 766ede9..f949779 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -90,6 +90,10 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
        public void close() throws IOException {
                synchronized (getSynchronizationLock()) {
 
+                       if (closed) {
+                               return;
+                       }
+
                        IOUtils.closeAllQuietly(closeableToRef.keySet());
 
                        closeableToRef.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/38003c28/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 4f3ed01..47ebe3b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
@@ -122,6 +123,9 @@ public abstract class AbstractKeyedStateBackend<K>
         */
        @Override
        public void dispose() {
+
+               IOUtils.closeQuietly(this);
+
                if (kvStateRegistry != null) {
                        kvStateRegistry.unregisterAll();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/38003c28/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index ec4aa81..ab0c1f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -120,6 +120,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
        @Override
        public void dispose() {
+               IOUtils.closeQuietly(this);
                registeredStates.clear();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/38003c28/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 8c1caee..057df2b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -26,7 +26,6 @@ import java.util.Collection;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -358,12 +357,10 @@ public abstract class AbstractStreamOperator<OUT>
        public void dispose() throws Exception {
 
                if (operatorStateBackend != null) {
-                       IOUtils.closeQuietly(operatorStateBackend);
                        operatorStateBackend.dispose();
                }
 
                if (keyedStateBackend != null) {
-                       IOUtils.closeQuietly(keyedStateBackend);
                        keyedStateBackend.dispose();
                }
        }

Reply via email to