[FLINK-8695] [rocksdb] Move flink-statebackend-rocksdb from 'flink-contrib' to 
'flink-state-backends'.

This closes #5523


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f7392d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f7392d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f7392d7

Branch: refs/heads/master
Commit: 2f7392d77d85823d3db1f1e5a8d4f6c94358d773
Parents: 3d52f52
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 19 10:31:44 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 19 18:37:49 2018 +0100

----------------------------------------------------------------------
 .../flink-statebackend-rocksdb/pom.xml          |   88 -
 .../streaming/state/AbstractRocksDBState.java   |  267 ---
 .../contrib/streaming/state/OptionsFactory.java |   74 -
 .../streaming/state/PredefinedOptions.java      |  209 --
 .../state/RocksDBAggregatingState.java          |  206 --
 .../streaming/state/RocksDBFoldingState.java    |  122 --
 .../state/RocksDBKeyedStateBackend.java         | 2033 ------------------
 .../streaming/state/RocksDBListState.java       |  230 --
 .../streaming/state/RocksDBMapState.java        |  532 -----
 .../streaming/state/RocksDBReducingState.java   |  189 --
 .../streaming/state/RocksDBStateBackend.java    |  691 ------
 .../state/RocksDBStateBackendFactory.java       |   49 -
 .../streaming/state/RocksDBValueState.java      |  106 -
 .../state/RocksDBAsyncSnapshotTest.java         |  505 -----
 .../streaming/state/RocksDBInitResetTest.java   |   32 -
 .../state/RocksDBMergeIteratorTest.java         |  152 --
 .../state/RocksDBStateBackendConfigTest.java    |  416 ----
 .../state/RocksDBStateBackendFactoryTest.java   |  176 --
 .../state/RocksDBStateBackendTest.java          |  526 -----
 .../state/RocksDbMultiClassLoaderTest.java      |   73 -
 .../RocksDBListStatePerformanceTest.java        |  168 --
 .../state/benchmark/RocksDBPerformanceTest.java |  204 --
 .../src/test/resources/log4j-test.properties    |   27 -
 flink-contrib/pom.xml                           |    1 -
 .../flink-statebackend-rocksdb/pom.xml          |   94 +
 .../streaming/state/AbstractRocksDBState.java   |  267 +++
 .../contrib/streaming/state/OptionsFactory.java |   74 +
 .../streaming/state/PredefinedOptions.java      |  209 ++
 .../state/RocksDBAggregatingState.java          |  206 ++
 .../streaming/state/RocksDBFoldingState.java    |  122 ++
 .../state/RocksDBKeyedStateBackend.java         | 2033 ++++++++++++++++++
 .../streaming/state/RocksDBListState.java       |  230 ++
 .../streaming/state/RocksDBMapState.java        |  532 +++++
 .../streaming/state/RocksDBReducingState.java   |  189 ++
 .../streaming/state/RocksDBStateBackend.java    |  691 ++++++
 .../state/RocksDBStateBackendFactory.java       |   49 +
 .../streaming/state/RocksDBValueState.java      |  106 +
 .../state/RocksDBAsyncSnapshotTest.java         |  505 +++++
 .../streaming/state/RocksDBInitResetTest.java   |   32 +
 .../state/RocksDBMergeIteratorTest.java         |  152 ++
 .../state/RocksDBStateBackendConfigTest.java    |  416 ++++
 .../state/RocksDBStateBackendFactoryTest.java   |  176 ++
 .../state/RocksDBStateBackendTest.java          |  526 +++++
 .../state/RocksDbMultiClassLoaderTest.java      |   73 +
 .../RocksDBListStatePerformanceTest.java        |  168 ++
 .../state/benchmark/RocksDBPerformanceTest.java |  204 ++
 .../src/test/resources/log4j-test.properties    |   27 +
 flink-state-backends/pom.xml                    |   42 +
 pom.xml                                         |    1 +
 tools/travis_mvn_watchdog.sh                    |    2 +-
 50 files changed, 7125 insertions(+), 7077 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml 
b/flink-contrib/flink-statebackend-rocksdb/pom.xml
deleted file mode 100644
index f9a4910..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ /dev/null
@@ -1,88 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-contrib</artifactId>
-               <version>1.5-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
-       <name>flink-statebackend-rocksdb</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-
-               <!-- core dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-clients_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.rocksdb</groupId>
-                       <artifactId>rocksdbjni</artifactId>
-                       <version>5.6.1</version>
-               </dependency>
-
-               <!-- test dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-       </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
deleted file mode 100644
index 969a1fc..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.util.Preconditions;
-
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
-
-import java.io.IOException;
-
-/**
- * Base class for {@link State} implementations that store state in a RocksDB 
database.
- *
- * <p>State is not stored in this class but in the {@link org.rocksdb.RocksDB} 
instance that
- * the {@link RocksDBStateBackend} manages and checkpoints.
- *
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <S> The type of {@link State}.
- * @param <SD> The type of {@link StateDescriptor}.
- */
-public abstract class AbstractRocksDBState<K, N, S extends State, SD extends 
StateDescriptor<S, V>, V>
-               implements InternalKvState<N>, State {
-
-       /** Serializer for the namespace. */
-       final TypeSerializer<N> namespaceSerializer;
-
-       /** The current namespace, which the next value methods will refer to. 
*/
-       private N currentNamespace;
-
-       /** Backend that holds the actual RocksDB instance where we store 
state. */
-       protected RocksDBKeyedStateBackend<K> backend;
-
-       /** The column family of this particular instance of state. */
-       protected ColumnFamilyHandle columnFamily;
-
-       /** State descriptor from which to create this state instance. */
-       protected final SD stateDesc;
-
-       /**
-        * We disable writes to the write-ahead-log here.
-        */
-       private final WriteOptions writeOptions;
-
-       protected final ByteArrayOutputStreamWithPos keySerializationStream;
-       protected final DataOutputView keySerializationDataOutputView;
-
-       private final boolean ambiguousKeyPossible;
-
-       /**
-        * Creates a new RocksDB backed state.
-        *  @param namespaceSerializer The serializer for the namespace.
-        */
-       protected AbstractRocksDBState(
-                       ColumnFamilyHandle columnFamily,
-                       TypeSerializer<N> namespaceSerializer,
-                       SD stateDesc,
-                       RocksDBKeyedStateBackend<K> backend) {
-
-               this.namespaceSerializer = namespaceSerializer;
-               this.backend = backend;
-
-               this.columnFamily = columnFamily;
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
-               this.stateDesc = Preconditions.checkNotNull(stateDesc, "State 
Descriptor");
-
-               this.keySerializationStream = new 
ByteArrayOutputStreamWithPos(128);
-               this.keySerializationDataOutputView = new 
DataOutputViewStreamWrapper(keySerializationStream);
-               this.ambiguousKeyPossible = 
(backend.getKeySerializer().getLength() < 0)
-                               && (namespaceSerializer.getLength() < 0);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void clear() {
-               try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
-                       backend.db.remove(columnFamily, writeOptions, key);
-               } catch (IOException | RocksDBException e) {
-                       throw new RuntimeException("Error while removing entry 
from RocksDB", e);
-               }
-       }
-
-       @Override
-       public void setCurrentNamespace(N namespace) {
-               this.currentNamespace = Preconditions.checkNotNull(namespace, 
"Namespace");
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) 
throws Exception {
-               Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
-
-               //TODO make KvStateSerializer key-group aware to save this 
round trip and key-group computation
-               Tuple2<K, N> des = KvStateSerializer.<K, 
N>deserializeKeyAndNamespace(
-                               serializedKeyAndNamespace,
-                               backend.getKeySerializer(),
-                               namespaceSerializer);
-
-               int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-
-               // we cannot reuse the keySerializationStream member since this 
method
-               // is called concurrently to the other ones and it may thus 
contain garbage
-               ByteArrayOutputStreamWithPos tmpKeySerializationStream = new 
ByteArrayOutputStreamWithPos(128);
-               DataOutputViewStreamWrapper 
tmpKeySerializationDateDataOutputView = new 
DataOutputViewStreamWrapper(tmpKeySerializationStream);
-
-               writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
-                       tmpKeySerializationStream, 
tmpKeySerializationDateDataOutputView);
-
-               return backend.db.get(columnFamily, 
tmpKeySerializationStream.toByteArray());
-       }
-
-       protected void writeCurrentKeyWithGroupAndNamespace() throws 
IOException {
-               writeKeyWithGroupAndNamespace(
-                       backend.getCurrentKeyGroupIndex(),
-                       backend.getCurrentKey(),
-                       currentNamespace,
-                       keySerializationStream,
-                       keySerializationDataOutputView);
-       }
-
-       protected void writeKeyWithGroupAndNamespace(
-                       int keyGroup, K key, N namespace,
-                       ByteArrayOutputStreamWithPos keySerializationStream,
-                       DataOutputView keySerializationDataOutputView) throws 
IOException {
-
-               Preconditions.checkNotNull(key, "No key set. This method should 
not be called outside of a keyed context.");
-
-               keySerializationStream.reset();
-               writeKeyGroup(keyGroup, keySerializationDataOutputView);
-               writeKey(key, keySerializationStream, 
keySerializationDataOutputView);
-               writeNameSpace(namespace, keySerializationStream, 
keySerializationDataOutputView);
-       }
-
-       private void writeKeyGroup(
-                       int keyGroup,
-                       DataOutputView keySerializationDateDataOutputView) 
throws IOException {
-               for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
-                       keySerializationDateDataOutputView.writeByte(keyGroup 
>>> (i << 3));
-               }
-       }
-
-       private void writeKey(
-                       K key,
-                       ByteArrayOutputStreamWithPos keySerializationStream,
-                       DataOutputView keySerializationDataOutputView) throws 
IOException {
-               //write key
-               int beforeWrite = keySerializationStream.getPosition();
-               backend.getKeySerializer().serialize(key, 
keySerializationDataOutputView);
-
-               if (ambiguousKeyPossible) {
-                       //write size of key
-                       writeLengthFrom(beforeWrite, keySerializationStream,
-                               keySerializationDataOutputView);
-               }
-       }
-
-       private void writeNameSpace(
-                       N namespace,
-                       ByteArrayOutputStreamWithPos keySerializationStream,
-                       DataOutputView keySerializationDataOutputView) throws 
IOException {
-               int beforeWrite = keySerializationStream.getPosition();
-               namespaceSerializer.serialize(namespace, 
keySerializationDataOutputView);
-
-               if (ambiguousKeyPossible) {
-                       //write length of namespace
-                       writeLengthFrom(beforeWrite, keySerializationStream,
-                               keySerializationDataOutputView);
-               }
-       }
-
-       private static void writeLengthFrom(
-                       int fromPosition,
-                       ByteArrayOutputStreamWithPos keySerializationStream,
-                       DataOutputView keySerializationDateDataOutputView) 
throws IOException {
-               int length = keySerializationStream.getPosition() - 
fromPosition;
-               writeVariableIntBytes(length, 
keySerializationDateDataOutputView);
-       }
-
-       private static void writeVariableIntBytes(
-                       int value,
-                       DataOutputView keySerializationDateDataOutputView)
-                       throws IOException {
-               do {
-                       keySerializationDateDataOutputView.writeByte(value);
-                       value >>>= 8;
-               } while (value != 0);
-       }
-
-       protected Tuple3<Integer, K, N> 
readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
-               int keyGroup = readKeyGroup(inputView);
-               K key = readKey(inputStream, inputView);
-               N namespace = readNamespace(inputStream, inputView);
-
-               return new Tuple3<>(keyGroup, key, namespace);
-       }
-
-       private int readKeyGroup(DataInputView inputView) throws IOException {
-               int keyGroup = 0;
-               for (int i = 0; i < backend.getKeyGroupPrefixBytes(); ++i) {
-                       keyGroup <<= 8;
-                       keyGroup |= (inputView.readByte() & 0xFF);
-               }
-               return keyGroup;
-       }
-
-       private K readKey(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
-               int beforeRead = inputStream.getPosition();
-               K key = backend.getKeySerializer().deserialize(inputView);
-               if (ambiguousKeyPossible) {
-                       int length = inputStream.getPosition() - beforeRead;
-                       readVariableIntBytes(inputView, length);
-               }
-               return key;
-       }
-
-       private N readNamespace(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
-               int beforeRead = inputStream.getPosition();
-               N namespace = namespaceSerializer.deserialize(inputView);
-               if (ambiguousKeyPossible) {
-                       int length = inputStream.getPosition() - beforeRead;
-                       readVariableIntBytes(inputView, length);
-               }
-               return namespace;
-       }
-
-       private void readVariableIntBytes(DataInputView inputView, int value) 
throws IOException {
-               do {
-                       inputView.readByte();
-                       value >>>= 8;
-               } while (value != 0);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
deleted file mode 100644
index 34f7f62..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-
-/**
- * A factory for {@link DBOptions} to be passed to the {@link 
RocksDBStateBackend}.
- * Options have to be created lazily by this factory, because the {@code 
Options}
- * class is not serializable and holds pointers to native code.
- *
- * <p>A typical pattern to use this OptionsFactory is as follows:
- *
- * <h3>Java 8:</h3>
- * <pre>{@code
- * rocksDbBackend.setOptions( (currentOptions) -> 
currentOptions.setMaxOpenFiles(1024) );
- * }</pre>
- *
- * <h3>Java 7:</h3>
- * <pre>{@code
- * rocksDbBackend.setOptions(new OptionsFactory() {
- *
- *     public Options setOptions(Options currentOptions) {
- *         return currentOptions.setMaxOpenFiles(1024);
- *     }
- * })
- * }</pre>
- */
-public interface OptionsFactory extends java.io.Serializable {
-
-       /**
-        * This method should set the additional options on top of the current 
options object.
-        * The current options object may contain pre-defined options based on 
flags that have
-        * been configured on the state backend.
-        *
-        * <p>It is important to set the options on the current object and 
return the result from
-        * the setter methods, otherwise the pre-defined options may get lost.
-        *
-        * @param currentOptions The options object with the pre-defined 
options.
-        * @return The options object on which the additional options are set.
-        */
-       DBOptions createDBOptions(DBOptions currentOptions);
-
-       /**
-        * This method should set the additional options on top of the current 
options object.
-        * The current options object may contain pre-defined options based on 
flags that have
-        * been configured on the state backend.
-        *
-        * <p>It is important to set the options on the current object and 
return the result from
-        * the setter methods, otherwise the pre-defined options may get lost.
-        *
-        * @param currentOptions The options object with the pre-defined 
options.
-        * @return The options object on which the additional options are set.
-        */
-       ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
deleted file mode 100644
index 73dc0be..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.BloomFilter;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.DBOptions;
-
-/**
- * The {@code PredefinedOptions} are configuration settings for the {@link 
RocksDBStateBackend}.
- * The various pre-defined choices are configurations that have been 
empirically
- * determined to be beneficial for performance under different settings.
- *
- * <p>Some of these settings are based on experiments by the Flink community, 
some follow
- * guides from the RocksDB project.
- */
-public enum PredefinedOptions {
-
-       /**
-        * Default options for all settings, except that writes are not forced 
to the
-        * disk.
-        *
-        * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
-        * there is no need to sync data to stable storage.
-        */
-       DEFAULT {
-
-               @Override
-               public DBOptions createDBOptions() {
-                       return new DBOptions()
-                                       .setUseFsync(false);
-               }
-
-               @Override
-               public ColumnFamilyOptions createColumnOptions() {
-                       return new ColumnFamilyOptions();
-               }
-
-       },
-
-       /**
-        * Pre-defined options for regular spinning hard disks.
-        *
-        * <p>This constant configures RocksDB with some options that lead 
empirically
-        * to better performance when the machines executing the system use
-        * regular spinning hard disks.
-        *
-        * <p>The following options are set:
-        * <ul>
-        *     <li>setCompactionStyle(CompactionStyle.LEVEL)</li>
-        *     <li>setLevelCompactionDynamicLevelBytes(true)</li>
-        *     <li>setIncreaseParallelism(4)</li>
-        *     <li>setUseFsync(false)</li>
-        *     <li>setDisableDataSync(true)</li>
-        *     <li>setMaxOpenFiles(-1)</li>
-        * </ul>
-        *
-        * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
-        * there is no need to sync data to stable storage.
-        */
-       SPINNING_DISK_OPTIMIZED {
-
-               @Override
-               public DBOptions createDBOptions() {
-
-                       return new DBOptions()
-                                       .setIncreaseParallelism(4)
-                                       .setUseFsync(false)
-                                       .setMaxOpenFiles(-1);
-               }
-
-               @Override
-               public ColumnFamilyOptions createColumnOptions() {
-                       return new ColumnFamilyOptions()
-                                       
.setCompactionStyle(CompactionStyle.LEVEL)
-                                       
.setLevelCompactionDynamicLevelBytes(true);
-               }
-       },
-
-       /**
-        * Pre-defined options for better performance on regular spinning hard 
disks,
-        * at the cost of a higher memory consumption.
-        *
-        * <p><b>NOTE: These settings will cause RocksDB to consume a lot of 
memory for
-        * block caching and compactions. If you experience out-of-memory 
problems related to,
-        * RocksDB, consider switching back to {@link 
#SPINNING_DISK_OPTIMIZED}.</b></p>
-        *
-        * <p>The following options are set:
-        * <ul>
-        *     <li>setLevelCompactionDynamicLevelBytes(true)</li>
-        *     <li>setTargetFileSizeBase(256 MBytes)</li>
-        *     <li>setMaxBytesForLevelBase(1 GByte)</li>
-        *     <li>setWriteBufferSize(64 MBytes)</li>
-        *     <li>setIncreaseParallelism(4)</li>
-        *     <li>setMinWriteBufferNumberToMerge(3)</li>
-        *     <li>setMaxWriteBufferNumber(4)</li>
-        *     <li>setUseFsync(false)</li>
-        *     <li>setMaxOpenFiles(-1)</li>
-        *     <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)</li>
-        *     <li>BlockBasedTableConfigsetBlockSize(128 KBytes)</li>
-        * </ul>
-        *
-        * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
-        * there is no need to sync data to stable storage.
-        */
-       SPINNING_DISK_OPTIMIZED_HIGH_MEM {
-
-               @Override
-               public DBOptions createDBOptions() {
-
-                       return new DBOptions()
-                                       .setIncreaseParallelism(4)
-                                       .setUseFsync(false)
-                                       .setMaxOpenFiles(-1);
-               }
-
-               @Override
-               public ColumnFamilyOptions createColumnOptions() {
-
-                       final long blockCacheSize = 256 * 1024 * 1024;
-                       final long blockSize = 128 * 1024;
-                       final long targetFileSize = 256 * 1024 * 1024;
-                       final long writeBufferSize = 64 * 1024 * 1024;
-
-                       return new ColumnFamilyOptions()
-                                       
.setCompactionStyle(CompactionStyle.LEVEL)
-                                       
.setLevelCompactionDynamicLevelBytes(true)
-                                       .setTargetFileSizeBase(targetFileSize)
-                                       .setMaxBytesForLevelBase(4 * 
targetFileSize)
-                                       .setWriteBufferSize(writeBufferSize)
-                                       .setMinWriteBufferNumberToMerge(3)
-                                       .setMaxWriteBufferNumber(4)
-                                       .setTableFormatConfig(
-                                                       new 
BlockBasedTableConfig()
-                                                                       
.setBlockCacheSize(blockCacheSize)
-                                                                       
.setBlockSize(blockSize)
-                                                                       
.setFilter(new BloomFilter())
-                                       );
-               }
-       },
-
-       /**
-        * Pre-defined options for Flash SSDs.
-        *
-        * <p>This constant configures RocksDB with some options that lead 
empirically
-        * to better performance when the machines executing the system use 
SSDs.
-        *
-        * <p>The following options are set:
-        * <ul>
-        *     <li>setIncreaseParallelism(4)</li>
-        *     <li>setUseFsync(false)</li>
-        *     <li>setDisableDataSync(true)</li>
-        *     <li>setMaxOpenFiles(-1)</li>
-        * </ul>
-        *
-        * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
-        * there is no need to sync data to stable storage.
-        */
-       FLASH_SSD_OPTIMIZED {
-
-               @Override
-               public DBOptions createDBOptions() {
-                       return new DBOptions()
-                                       .setIncreaseParallelism(4)
-                                       .setUseFsync(false)
-                                       .setMaxOpenFiles(-1);
-               }
-
-               @Override
-               public ColumnFamilyOptions createColumnOptions() {
-                       return new ColumnFamilyOptions();
-               }
-       };
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates the {@link DBOptions}for this pre-defined setting.
-        *
-        * @return The pre-defined options object.
-        */
-       public abstract DBOptions createDBOptions();
-
-       /**
-        * Creates the {@link org.rocksdb.ColumnFamilyOptions}for this 
pre-defined setting.
-        *
-        * @return The pre-defined options object.
-        */
-       public abstract ColumnFamilyOptions createColumnOptions();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
deleted file mode 100644
index 2c07814..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * An {@link AggregatingState} implementation that stores state in RocksDB.
- *
- * @param <K> The type of the key
- * @param <N> The type of the namespace
- * @param <T> The type of the values that aggregated into the state
- * @param <ACC> The type of the value stored in the state (the accumulator 
type)
- * @param <R> The type of the value returned from the state
- */
-public class RocksDBAggregatingState<K, N, T, ACC, R>
-       extends AbstractRocksDBState<K, N, AggregatingState<T, R>, 
AggregatingStateDescriptor<T, ACC, R>, ACC>
-       implements InternalAggregatingState<N, T, R> {
-
-       /** Serializer for the values. */
-       private final TypeSerializer<ACC> valueSerializer;
-
-       /** User-specified aggregation function. */
-       private final AggregateFunction<T, ACC, R> aggFunction;
-
-       /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
-       /**
-        * Creates a new {@code RocksDBFoldingState}.
-        *
-        * @param namespaceSerializer
-        *             The serializer for the namespace.
-        * @param stateDesc
-        *             The state identifier for the state. This contains the 
state name and aggregation function.
-        */
-       public RocksDBAggregatingState(
-                       ColumnFamilyHandle columnFamily,
-                       TypeSerializer<N> namespaceSerializer,
-                       AggregatingStateDescriptor<T, ACC, R> stateDesc,
-                       RocksDBKeyedStateBackend<K> backend) {
-
-               super(columnFamily, namespaceSerializer, stateDesc, backend);
-
-               this.valueSerializer = stateDesc.getSerializer();
-               this.aggFunction = stateDesc.getAggregateFunction();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
-       }
-
-       @Override
-       public R get() throws IOException {
-               try {
-                       // prepare the current key and namespace for RocksDB 
lookup
-                       writeCurrentKeyWithGroupAndNamespace();
-                       final byte[] key = keySerializationStream.toByteArray();
-
-                       // get the current value
-                       final byte[] valueBytes = backend.db.get(columnFamily, 
key);
-
-                       if (valueBytes == null) {
-                               return null;
-                       }
-
-                       ACC accumulator = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-                       return aggFunction.getResult(accumulator);
-               }
-               catch (IOException | RocksDBException e) {
-                       throw new IOException("Error while retrieving value 
from RocksDB", e);
-               }
-       }
-
-       @Override
-       public void add(T value) throws IOException {
-               try {
-                       // prepare the current key and namespace for RocksDB 
lookup
-                       writeCurrentKeyWithGroupAndNamespace();
-                       final byte[] key = keySerializationStream.toByteArray();
-                       keySerializationStream.reset();
-
-                       // get the current value
-                       final byte[] valueBytes = backend.db.get(columnFamily, 
key);
-
-                       // deserialize the current accumulator, or create a 
blank one
-                       ACC accumulator = valueBytes == null ?
-                                       aggFunction.createAccumulator() :
-                                       valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-
-                       // aggregate the value into the accumulator
-                       accumulator = aggFunction.add(value, accumulator);
-
-                       // serialize the new accumulator
-                       final DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
-                       valueSerializer.serialize(accumulator, out);
-
-                       // write the new value to RocksDB
-                       backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
-               }
-               catch (IOException | RocksDBException e) {
-                       throw new IOException("Error while adding value to 
RocksDB", e);
-               }
-       }
-
-       @Override
-       public void mergeNamespaces(N target, Collection<N> sources) throws 
Exception {
-               if (sources == null || sources.isEmpty()) {
-                       return;
-               }
-
-               // cache key and namespace
-               final K key = backend.getCurrentKey();
-               final int keyGroup = backend.getCurrentKeyGroupIndex();
-
-               try {
-                       ACC current = null;
-
-                       // merge the sources to the target
-                       for (N source : sources) {
-                               if (source != null) {
-                                       writeKeyWithGroupAndNamespace(
-                                                       keyGroup, key, source,
-                                                       keySerializationStream, 
keySerializationDataOutputView);
-
-                                       final byte[] sourceKey = 
keySerializationStream.toByteArray();
-                                       final byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
-                                       backend.db.delete(columnFamily, 
sourceKey);
-
-                                       if (valueBytes != null) {
-                                               ACC value = 
valueSerializer.deserialize(
-                                                               new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-
-                                               if (current != null) {
-                                                       current = 
aggFunction.merge(current, value);
-                                               }
-                                               else {
-                                                       current = value;
-                                               }
-                                       }
-                               }
-                       }
-
-                       // if something came out of merging the sources, merge 
it or write it to the target
-                       if (current != null) {
-                               // create the target full-binary-key
-                               writeKeyWithGroupAndNamespace(
-                                               keyGroup, key, target,
-                                               keySerializationStream, 
keySerializationDataOutputView);
-
-                               final byte[] targetKey = 
keySerializationStream.toByteArray();
-                               final byte[] targetValueBytes = 
backend.db.get(columnFamily, targetKey);
-
-                               if (targetValueBytes != null) {
-                                       // target also had a value, merge
-                                       ACC value = valueSerializer.deserialize(
-                                                       new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
-
-                                       current = aggFunction.merge(current, 
value);
-                               }
-
-                               // serialize the resulting value
-                               keySerializationStream.reset();
-                               valueSerializer.serialize(current, 
keySerializationDataOutputView);
-
-                               // write the resulting value
-                               backend.db.put(columnFamily, writeOptions, 
targetKey, keySerializationStream.toByteArray());
-                       }
-               }
-               catch (Exception e) {
-                       throw new Exception("Error while merging state in 
RocksDB", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
deleted file mode 100644
index 479565e..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
-
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
-
-import java.io.IOException;
-
-/**
- * {@link FoldingState} implementation that stores state in RocksDB.
- *
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <T> The type of the values that can be folded into the state.
- * @param <ACC> The type of the value in the folding state.
- *
- * @deprecated will be removed in a future version
- */
-@Deprecated
-public class RocksDBFoldingState<K, N, T, ACC>
-       extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, ACC>
-       implements InternalFoldingState<N, T, ACC> {
-
-       /** Serializer for the values. */
-       private final TypeSerializer<ACC> valueSerializer;
-
-       /** User-specified fold function. */
-       private final FoldFunction<T, ACC> foldFunction;
-
-       /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
-       /**
-        * Creates a new {@code RocksDBFoldingState}.
-        *
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state. This contains 
name
-        *                     and can create a default state value.
-        */
-       public RocksDBFoldingState(ColumnFamilyHandle columnFamily,
-                       TypeSerializer<N> namespaceSerializer,
-                       FoldingStateDescriptor<T, ACC> stateDesc,
-                       RocksDBKeyedStateBackend<K> backend) {
-
-               super(columnFamily, namespaceSerializer, stateDesc, backend);
-
-               this.valueSerializer = stateDesc.getSerializer();
-               this.foldFunction = stateDesc.getFoldFunction();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
-       }
-
-       @Override
-       public ACC get() {
-               try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
-                       byte[] valueBytes = backend.db.get(columnFamily, key);
-                       if (valueBytes == null) {
-                               return null;
-                       }
-                       return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-               } catch (IOException | RocksDBException e) {
-                       throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
-               }
-       }
-
-       @Override
-       public void add(T value) throws IOException {
-               try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
-                       byte[] valueBytes = backend.db.get(columnFamily, key);
-                       DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
-                       if (valueBytes == null) {
-                               keySerializationStream.reset();
-                               
valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), 
value), out);
-                               backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
-                       } else {
-                               ACC oldValue = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-                               ACC newValue = foldFunction.fold(oldValue, 
value);
-                               keySerializationStream.reset();
-                               valueSerializer.serialize(newValue, out);
-                               backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
-                       }
-               } catch (Exception e) {
-                       throw new RuntimeException("Error while adding data to 
RocksDB", e);
-               }
-       }
-
-}

Reply via email to