Repository: flink
Updated Branches:
  refs/heads/master 5795ebe18 -> 8ba5c7a37


[FLINK-6364] [checkpoint] Additional minor review changes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ba5c7a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ba5c7a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ba5c7a3

Branch: refs/heads/master
Commit: 8ba5c7a37ff56cc9b60277ef13827614a1b3a10a
Parents: 6e94cf1
Author: Stefan Richter <[email protected]>
Authored: Fri May 5 13:09:46 2017 +0200
Committer: Stefan Richter <[email protected]>
Committed: Fri May 5 16:30:06 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 18 ++++----
 .../state/RocksDBStateBackendTest.java          |  1 -
 .../state/AbstractKeyedStateBackend.java        | 12 +----
 .../state/heap/HeapKeyedStateBackend.java       |  2 +-
 .../runtime/state/StateBackendTestBase.java     |  6 +--
 .../util/BlockerCheckpointStreamFactory.java    | 10 ++++-
 .../api/operators/AbstractStreamOperator.java   |  2 +-
 ...tractEventTimeWindowCheckpointingITCase.java | 10 ++++-
 ...ckendEventTimeWindowCheckpointingITCase.java | 46 ++++++++++++++++++++
 9 files changed, 79 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ee5f956..b8e60cd 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -705,7 +705,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
        }
 
-       private static class RocksDBIncrementalSnapshotOperation {
+       private static final class RocksDBIncrementalSnapshotOperation {
 
                private final RocksDBKeyedStateBackend<?> stateBackend;
 
@@ -717,22 +717,22 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                private Map<String, StreamStateHandle> baseSstFiles;
 
-               private List<KeyedBackendSerializationProxy.StateMetaInfo<?, 
?>> stateMetaInfos = new ArrayList<>();
+               private final 
List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new 
ArrayList<>();
 
                private FileSystem backupFileSystem;
                private Path backupPath;
 
                // Registry for all opened i/o streams
-               private CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+               private final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
                // new sst files since the last completed checkpoint
-               private Map<String, StreamStateHandle> newSstFiles = new 
HashMap<>();
+               private final Map<String, StreamStateHandle> newSstFiles = new 
HashMap<>();
 
                // old sst files which have been materialized in previous 
completed checkpoints
-               private Map<String, StreamStateHandle> oldSstFiles = new 
HashMap<>();
+               private final Map<String, StreamStateHandle> oldSstFiles = new 
HashMap<>();
 
                // handles to the misc files in the current snapshot
-               private Map<String, StreamStateHandle> miscFiles = new 
HashMap<>();
+               private final Map<String, StreamStateHandle> miscFiles = new 
HashMap<>();
 
                private StreamStateHandle metaStateHandle = null;
 
@@ -753,7 +753,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
 
                        try {
-                               final byte[] buffer = new byte[1024];
+                               final byte[] buffer = new byte[8 * 1024];
 
                                FileSystem backupFileSystem = 
backupPath.getFileSystem();
                                inputStream = backupFileSystem.open(filePath);
@@ -966,7 +966,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        @Override
-       public void notifyOfCompletedCheckpoint(long completedCheckpointId) {
+       public void notifyCheckpointComplete(long completedCheckpointId) {
                synchronized (asyncSnapshotLock) {
                        if (completedCheckpointId < lastCompletedCheckpointId) {
                                return;
@@ -1237,7 +1237,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
                                
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
 
-                               byte[] buffer = new byte[1024];
+                               byte[] buffer = new byte[8 * 1024];
                                while (true) {
                                        int numBytes = inputStream.read(buffer);
                                        if (numBytes == -1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index fad1559..99b71c5 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -45,7 +45,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ReadOptions;

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 61f397c..4f3ed01 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -61,7 +61,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <K> Type of the key by which state is keyed.
  */
 public abstract class AbstractKeyedStateBackend<K>
-               implements KeyedStateBackend<K>, 
Snapshotable<KeyedStateHandle>, Closeable {
+               implements KeyedStateBackend<K>, 
Snapshotable<KeyedStateHandle>, Closeable, CheckpointListener {
 
        /** {@link TypeSerializer} for our key. */
        protected final TypeSerializer<K> keySerializer;
@@ -212,16 +212,6 @@ public abstract class AbstractKeyedStateBackend<K>
                        MapStateDescriptor<UK, UV> stateDesc) throws Exception;
 
        /**
-        * Called when the checkpoint with the given ID is completed and 
acknowledged on the JobManager.
-        *
-        * @param checkpointId The ID of the checkpoint that has been completed.
-        *
-        * @throws Exception Exceptions during checkpoint acknowledgement may 
be forwarded and will cause
-        *                   the program to fail and enter recovery.
-        */
-       public abstract void notifyOfCompletedCheckpoint(long checkpointId) 
throws Exception;
-
-       /**
         * @see KeyedStateBackend
         */
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index ead89b3..aecc72e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -430,7 +430,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        @Override
-       public void notifyOfCompletedCheckpoint(long checkpointId) {
+       public void notifyCheckpointComplete(long checkpointId) {
                //Nothing to do
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 60f9c81..7152bfc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -43,8 +43,8 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -69,6 +69,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -98,7 +99,6 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import org.junit.rules.ExpectedException;
 
 
 /**
@@ -2235,7 +2235,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                BlockerCheckpointStreamFactory streamFactory = new 
BlockerCheckpointStreamFactory(1024 * 1024);
                streamFactory.setWaiterLatch(waiter);
                streamFactory.setBlockerLatch(blocker);
-               streamFactory.setAfterNumberInvocations(100);
+               streamFactory.setAfterNumberInvocations(10);
 
                AbstractKeyedStateBackend<Integer> backend = null;
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index 291f3ed..1e31490 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -99,6 +99,14 @@ public class BlockerCheckpointStreamFactory implements 
CheckpointStreamFactory {
                                }
                        }
 
+                       //We override this to ensure that writes go through the 
blocking #write(int) method!
+                       @Override
+                       public void write(byte[] b, int off, int len) throws 
IOException {
+                               for (int i = 0; i < len; i++) {
+                                       write(b[off + i]);
+                               }
+                       }
+
                        @Override
                        public void close() {
                                super.close();
@@ -115,4 +123,4 @@ public class BlockerCheckpointStreamFactory implements 
CheckpointStreamFactory {
        public void close() throws Exception {
 
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index d45ad42..8c1caee 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -506,7 +506,7 @@ public abstract class AbstractStreamOperator<OUT>
        @Override
        public void notifyOfCompletedCheckpoint(long checkpointId) throws 
Exception {
                if (keyedStateBackend != null) {
-                       
keyedStateBackend.notifyOfCompletedCheckpoint(checkpointId);
+                       
keyedStateBackend.notifyCheckpointComplete(checkpointId);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index d91c57f..dbef01f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -94,7 +94,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
        }
 
        enum StateBackendEnum {
-               MEM, FILE, ROCKSDB_FULLY_ASYNC, MEM_ASYNC, FILE_ASYNC
+               MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, MEM_ASYNC, 
FILE_ASYNC
        }
 
        @BeforeClass
@@ -143,6 +143,14 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                                this.stateBackend = rdb;
                                break;
                        }
+                       case ROCKSDB_INCREMENTAL: {
+                               String rocksDb = 
tempFolder.newFolder().getAbsolutePath();
+                               RocksDBStateBackend rdb =
+                                       new RocksDBStateBackend(new 
MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
+                               rdb.setDbStoragePath(rocksDb);
+                               this.stateBackend = rdb;
+                               break;
+                       }
 
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..352f9f7
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase 
extends AbstractEventTimeWindowCheckpointingITCase {
+
+       public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() {
+               super(StateBackendEnum.ROCKSDB_INCREMENTAL);
+       }
+
+       @Override
+       protected int numElementsPerKey() {
+               return 3000;
+       }
+
+       @Override
+       protected int windowSize() {
+               return 1000;
+       }
+
+       @Override
+       protected int windowSlide() {
+               return 100;
+       }
+
+       @Override
+       protected int numKeys() {
+               return 100;
+       }
+}

Reply via email to