[FLINK-3278] Add Partitioned State Backend Based on RocksDB
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/524e56bc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/524e56bc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/524e56bc Branch: refs/heads/master Commit: 524e56bcbf9faf4b7363abfefeb3b3fa53949bad Parents: 67ca4a4 Author: Aljoscha Krettek <[email protected]> Authored: Thu Jan 21 10:56:47 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Feb 3 20:27:51 2016 +0100 ---------------------------------------------------------------------- .../flink-statebackend-rocksdb/pom.xml | 71 ++++ .../streaming/state/AbstractRocksDBState.java | 372 +++++++++++++++++++ .../streaming/state/RocksDBListState.java | 183 +++++++++ .../streaming/state/RocksDBReducingState.java | 190 ++++++++++ .../streaming/state/RocksDBStateBackend.java | 127 +++++++ .../streaming/state/RocksDBValueState.java | 156 ++++++++ .../state/RocksDBStateBackendTest.java | 53 +++ .../src/test/resources/log4j-test.properties | 27 ++ .../src/test/resources/log4j.properties | 27 ++ .../src/test/resources/logback-test.xml | 30 ++ flink-contrib/pom.xml | 1 + .../flink/util/ExternalProcessRunner.java | 233 ++++++++++++ .../apache/flink/util/HDFSCopyFromLocal.java | 48 +++ .../org/apache/flink/util/HDFSCopyToLocal.java | 49 +++ .../flink/util/ExternalProcessRunnerTest.java | 98 +++++ flink-tests/pom.xml | 7 + .../EventTimeWindowCheckpointingITCase.java | 9 + 17 files changed, 1681 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml new file mode 100644 index 0000000..999c496 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-contrib</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-statebackend-rocksdb_2.10</artifactId> + <name>flink-statebackend-rocksdb</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.10</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + <dependency> + <groupId>org.rocksdb</groupId> + <artifactId>rocksdbjni</artifactId> + <version>4.1.0</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java new file mode 100644 index 0000000..6dbe16c --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.contrib.streaming.state; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.apache.flink.util.HDFSCopyFromLocal; +import org.apache.flink.util.HDFSCopyToLocal; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.rocksdb.BackupEngine; +import org.rocksdb.BackupableDBOptions; +import org.rocksdb.Env; +import org.rocksdb.Options; +import org.rocksdb.RestoreOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; + +import static java.util.Objects.requireNonNull; + +/** + * Base class for {@link State} implementations that store state in a RocksDB database. + * + * <p>This base class is responsible for setting up the RocksDB database, for + * checkpointing/restoring the database and for disposal in the {@link #dispose()} method. The + * concrete subclasses just use the RocksDB handle to store/retrieve state. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <S> The type of {@link State}. + * @param <SD> The type of {@link StateDescriptor}. + * @param <Backend> The type of the backend that snapshots this key/value state. + */ +public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> + implements KvState<K, N, S, SD, Backend>, State { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class); + + /** Serializer for the keys */ + protected final TypeSerializer<K> keySerializer; + + /** Serializer for the namespace */ + protected final TypeSerializer<N> namespaceSerializer; + + /** The current key, which the next value methods will refer to */ + protected K currentKey; + + /** The current namespace, which the next value methods will refer to */ + protected N currentNamespace; + + /** Store it so that we can clean up in dispose() */ + protected final File dbPath; + + protected final String checkpointPath; + + /** Our RocksDB instance */ + protected final RocksDB db; + + /** + * Creates a new RocksDB backed state. + * + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param dbPath The path on the local system where RocksDB data should be stored. + */ + protected AbstractRocksDBState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + File dbPath, + String checkpointPath) { + this.keySerializer = requireNonNull(keySerializer); + this.namespaceSerializer = namespaceSerializer; + this.dbPath = dbPath; + this.checkpointPath = checkpointPath; + + RocksDB.loadLibrary(); + + Options options = new Options().setCreateIfMissing(true); + options.setMergeOperator(new StringAppendOperator()); + + if (!dbPath.exists()) { + if (!dbPath.mkdirs()) { + throw new RuntimeException("Could not create RocksDB data directory."); + } + } + + // clean it, this will remove the last part of the path but RocksDB will recreate it + try { + File db = new File(dbPath, "db"); + LOG.warn("Deleting already existing db directory {}.", db); + FileUtils.deleteDirectory(db); + } catch (IOException e) { + throw new RuntimeException("Error cleaning RocksDB data directory.", e); + } + + try { + db = RocksDB.open(options, new File(dbPath, "db").getAbsolutePath()); + } catch (RocksDBException e) { + throw new RuntimeException("Error while opening RocksDB instance.", e); + } + + options.dispose(); + + } + + /** + * Creates a new RocksDB backed state and restores from the given backup directory. After + * restoring the backup directory is deleted. + * + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param dbPath The path on the local system where RocksDB data should be stored. + * @param restorePath The path to a backup directory from which to restore RocksDb database. + */ + protected AbstractRocksDBState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + File dbPath, + String checkpointPath, + String restorePath) { + + RocksDB.loadLibrary(); + + try { + BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(restorePath + "/")); + backupEngine.restoreDbFromLatestBackup(new File(dbPath, "db").getAbsolutePath(), new File(dbPath, "db").getAbsolutePath(), new RestoreOptions(true)); + FileUtils.deleteDirectory(new File(restorePath)); + } catch (RocksDBException|IOException|IllegalArgumentException e) { + throw new RuntimeException("Error while restoring RocksDB state from " + restorePath, e); + } + + this.keySerializer = requireNonNull(keySerializer); + this.namespaceSerializer = namespaceSerializer; + this.dbPath = dbPath; + this.checkpointPath = checkpointPath; + + Options options = new Options().setCreateIfMissing(true); + options.setMergeOperator(new StringAppendOperator()); + + if (!dbPath.exists()) { + if (!dbPath.mkdirs()) { + throw new RuntimeException("Could not create RocksDB data directory."); + } + } + + try { + db = RocksDB.open(options, new File(dbPath, "db").getAbsolutePath()); + } catch (RocksDBException e) { + throw new RuntimeException("Error while opening RocksDB instance.", e); + } + + options.dispose(); + } + + // ------------------------------------------------------------------------ + + @Override + final public void clear() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + try { + writeKeyAndNamespace(out); + byte[] key = baos.toByteArray(); + db.remove(key); + } catch (IOException|RocksDBException e) { + throw new RuntimeException("Error while removing entry from RocksDB", e); + } + } + + protected void writeKeyAndNamespace(DataOutputView out) throws IOException { + keySerializer.serialize(currentKey, out); + out.writeByte(42); + namespaceSerializer.serialize(currentNamespace, out); + } + + @Override + final public void setCurrentKey(K currentKey) { + this.currentKey = currentKey; + } + + @Override + final public void setCurrentNamespace(N namespace) { + this.currentNamespace = namespace; + } + + protected abstract KvStateSnapshot<K, N, S, SD, Backend> createRocksDBSnapshot(URI backupUri, long checkpointId); + + @Override + final public KvStateSnapshot<K, N, S, SD, Backend> snapshot( + long checkpointId, + long timestamp) throws Exception { + boolean success = false; + + final File localBackupPath = new File(dbPath, "backup-" + checkpointId); + final URI backupUri = new URI(checkpointPath + "/chk-" + checkpointId); + + try { + if (!localBackupPath.exists()) { + if (!localBackupPath.mkdirs()) { + throw new RuntimeException("Could not create local backup path " + localBackupPath); + } + } + + BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), + new BackupableDBOptions(localBackupPath.getAbsolutePath())); + + backupEngine.createNewBackup(db); + + HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri); + KvStateSnapshot<K, N, S, SD, Backend> result = createRocksDBSnapshot(backupUri, checkpointId); + success = true; + return result; + } finally { + FileUtils.deleteDirectory(localBackupPath); + if (!success) { + FileSystem fs = FileSystem.get(backupUri, new Configuration()); + fs.delete(new Path(backupUri), true); + } + } + } + + @Override + final public void dispose() { + db.dispose(); + try { + FileUtils.deleteDirectory(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error disposing RocksDB data directory.", e); + } + } + + public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class); + + // ------------------------------------------------------------------------ + // Ctor parameters for RocksDB state + // ------------------------------------------------------------------------ + + /** Store it so that we can clean up in dispose() */ + protected final File dbPath; + + /** Where we should put RocksDB backups */ + protected final String checkpointPath; + + // ------------------------------------------------------------------------ + // Info about this checkpoint + // ------------------------------------------------------------------------ + + protected final URI backupUri; + + protected long checkpointId; + + // ------------------------------------------------------------------------ + // For sanity checks + // ------------------------------------------------------------------------ + + /** Key serializer */ + protected final TypeSerializer<K> keySerializer; + + /** Namespace serializer */ + protected final TypeSerializer<N> namespaceSerializer; + + /** Hash of the StateDescriptor, for sanity checks */ + protected final SD stateDesc; + + public AbstractRocksDBSnapshot(File dbPath, + String checkpointPath, + URI backupUri, + long checkpointId, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + SD stateDesc) { + this.dbPath = dbPath; + this.checkpointPath = checkpointPath; + this.backupUri = backupUri; + this.checkpointId = checkpointId; + + this.stateDesc = stateDesc; + this.keySerializer = keySerializer; + this.namespaceSerializer = namespaceSerializer; + } + + protected abstract KvState<K, N, S, SD, Backend> createRocksDBState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + SD stateDesc, + File dbPath, + String backupPath, + String restorePath) throws Exception; + + @Override + public final KvState<K, N, S, SD, Backend> restoreState( + Backend stateBackend, + TypeSerializer<K> keySerializer, + ClassLoader classLoader, + long recoveryTimestamp) throws Exception { + + // validity checks + if (!this.keySerializer.equals(keySerializer)) { + throw new IllegalArgumentException( + "Cannot restore the state from the snapshot with the given serializers. " + + "State (K/V) was serialized with " + + "(" + keySerializer + ") " + + "now is (" + keySerializer + ")"); + } + + if (!dbPath.exists()) { + if (!dbPath.mkdirs()) { + throw new RuntimeException("Could not create RocksDB base path " + dbPath); + } + } + + FileSystem fs = FileSystem.get(backupUri, new Configuration()); + + final File localBackupPath = new File(dbPath, "chk-" + checkpointId); + + if (localBackupPath.exists()) { + try { + LOG.warn("Deleting already existing local backup directory {}.", localBackupPath); + FileUtils.deleteDirectory(localBackupPath); + } catch (IOException e) { + throw new RuntimeException("Error cleaning RocksDB local backup directory.", e); + } + } + + HDFSCopyToLocal.copyToLocal(backupUri, dbPath); + return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, localBackupPath.getAbsolutePath()); + } + + @Override + public final void discardState() throws Exception { + FileSystem fs = FileSystem.get(backupUri, new Configuration()); + fs.delete(new Path(backupUri), true); + } + + @Override + public final long getStateSize() throws Exception { + return 0; + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java new file mode 100644 index 0000000..e97e65d --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.rocksdb.RocksDBException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ListState} implementation that stores state in RocksDB. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <V> The type of the values in the list state. + * @param <Backend> The type of the backend that snapshots this key/value state. + */ +public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend> + extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, Backend> + implements ListState<V> { + + /** Serializer for the values */ + private final TypeSerializer<V> valueSerializer; + + /** This holds the name of the state and can create an initial default value for the state. */ + protected final ListStateDescriptor<V> stateDesc; + + /** + * Creates a new {@code RocksDBListState}. + * + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param dbPath The path on the local system where RocksDB data should be stored. + */ + protected RocksDBListState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<V> stateDesc, + File dbPath, + String backupPath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath); + this.stateDesc = requireNonNull(stateDesc); + this.valueSerializer = stateDesc.getSerializer(); + } + + /** + * Creates a new {@code RocksDBListState}. + * + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param dbPath The path on the local system where RocksDB data should be stored. + */ + protected RocksDBListState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath); + this.stateDesc = requireNonNull(stateDesc); + this.valueSerializer = stateDesc.getSerializer(); + } + + @Override + public Iterable<V> get() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + try { + writeKeyAndNamespace(out); + byte[] key = baos.toByteArray(); + byte[] valueBytes = db.get(key); + + if (valueBytes == null) { + return Collections.emptyList(); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(valueBytes); + DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); + + List<V> result = new ArrayList<>(); + while (in.available() > 0) { + result.add(valueSerializer.deserialize(in)); + if (in.available() > 0) { + in.readByte(); + } + } + return result; + } catch (IOException|RocksDBException e) { + throw new RuntimeException("Error while retrieving data from RocksDB", e); + } + } + + @Override + public void add(V value) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + try { + writeKeyAndNamespace(out); + byte[] key = baos.toByteArray(); + + baos.reset(); + + valueSerializer.serialize(value, out); + db.merge(key, baos.toByteArray()); + + } catch (Exception e) { + throw new RuntimeException("Error while adding data to RocksDB", e); + } + } + + @Override + protected KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend> createRocksDBSnapshot( + URI backupUri, + long checkpointId) { + return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); + } + + private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend> { + private static final long serialVersionUID = 1L; + + public Snapshot(File dbPath, + String checkpointPath, + URI backupUri, + long checkpointId, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<V> stateDesc) { + super(dbPath, + checkpointPath, + backupUri, + checkpointId, + keySerializer, + namespaceSerializer, + stateDesc); + } + + @Override + protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, Backend> createRocksDBState( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath) throws Exception { + return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath); + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java new file mode 100644 index 0000000..eb21c3b --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.contrib.streaming.state; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.rocksdb.RocksDBException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ReducingState} implementation that stores state in RocksDB. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <V> The type of value that the state state stores. + * @param <Backend> The type of the backend that snapshots this key/value state. + */ +public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend> + extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> + implements ReducingState<V> { + + /** Serializer for the values */ + private final TypeSerializer<V> valueSerializer; + + /** This holds the name of the state and can create an initial default value for the state. */ + protected final ReducingStateDescriptor<V> stateDesc; + + /** User-specified reduce function */ + private final ReduceFunction<V> reduceFunction; + + /** + * Creates a new {@code RocksDBReducingState}. + * + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param dbPath The path on the local system where RocksDB data should be stored. + */ + protected RocksDBReducingState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<V> stateDesc, + File dbPath, + String backupPath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath); + this.stateDesc = requireNonNull(stateDesc); + this.valueSerializer = stateDesc.getSerializer(); + this.reduceFunction = stateDesc.getReduceFunction(); + } + + protected RocksDBReducingState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath); + this.stateDesc = stateDesc; + this.valueSerializer = stateDesc.getSerializer(); + this.reduceFunction = stateDesc.getReduceFunction(); + } + + @Override + public V get() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + try { + writeKeyAndNamespace(out); + byte[] key = baos.toByteArray(); + byte[] valueBytes = db.get(key); + if (valueBytes == null) { + return null; + } + return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); + } catch (IOException|RocksDBException e) { + throw new RuntimeException("Error while retrieving data from RocksDB", e); + } + } + + @Override + public void add(V value) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + try { + writeKeyAndNamespace(out); + byte[] key = baos.toByteArray(); + byte[] valueBytes = db.get(key); + + if (valueBytes == null) { + baos.reset(); + valueSerializer.serialize(value, out); + db.put(key, baos.toByteArray()); + } else { + V oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); + V newValue = reduceFunction.reduce(oldValue, value); + baos.reset(); + valueSerializer.serialize(newValue, out); + db.put(key, baos.toByteArray()); + } + } catch (Exception e) { + throw new RuntimeException("Error while adding data to RocksDB", e); + } + } + + @Override + protected KvStateSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> createRocksDBSnapshot( + URI backupUri, + long checkpointId) { + return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); + } + + private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> { + private static final long serialVersionUID = 1L; + + public Snapshot(File dbPath, + String checkpointPath, + URI backupUri, + long checkpointId, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<V> stateDesc) { + super(dbPath, + checkpointPath, + backupUri, + checkpointId, + keySerializer, + namespaceSerializer, + stateDesc); + } + + @Override + protected KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> createRocksDBState( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath) throws Exception { + return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath); + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java new file mode 100644 index 0000000..aaaeea4 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import java.io.File; +import java.io.Serializable; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateHandle; + +import static java.util.Objects.requireNonNull; + +/** + * + */ +public class RocksDBStateBackend extends AbstractStateBackend { + private static final long serialVersionUID = 1L; + + /** Base path for RocksDB directory. */ + private final String dbBasePath; + + /** The checkpoint directory that we snapshot RocksDB backups to. */ + private final String checkpointDirectory; + + /** Operator identifier that is used to uniqueify the RocksDB storage path. */ + private String operatorIdentifier; + + /** JobID for uniquifying backup paths. */ + private JobID jobId; + + private AbstractStateBackend backingStateBackend; + + public RocksDBStateBackend(String dbBasePath, String checkpointDirectory, AbstractStateBackend backingStateBackend) { + this.dbBasePath = requireNonNull(dbBasePath); + this.checkpointDirectory = requireNonNull(checkpointDirectory); + this.backingStateBackend = requireNonNull(backingStateBackend); + } + + @Override + public void initializeForJob(Environment env, + String operatorIdentifier, + TypeSerializer<?> keySerializer) throws Exception { + super.initializeForJob(env, operatorIdentifier, keySerializer); + this.operatorIdentifier = operatorIdentifier.replace(" ", ""); + backingStateBackend.initializeForJob(env, operatorIdentifier, keySerializer); + this.jobId = env.getJobID(); + } + + @Override + public void disposeAllStateForCurrentJob() throws Exception { + + } + + @Override + public void close() throws Exception { + + } + + private File getDbPath(String stateName) { + return new File(new File(new File(new File(dbBasePath), jobId.toShortString()), operatorIdentifier), stateName); + } + + private String getCheckpointPath(String stateName) { + return checkpointDirectory + "/" + jobId.toShortString() + "/" + operatorIdentifier + "/" + stateName; + } + + @Override + protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<T> stateDesc) throws Exception { + File dbPath = getDbPath(stateDesc.getName()); + String checkpointPath = getCheckpointPath(stateDesc.getName()); + return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath); + } + + @Override + protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<T> stateDesc) throws Exception { + File dbPath = getDbPath(stateDesc.getName()); + String checkpointPath = getCheckpointPath(stateDesc.getName()); + return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath); + } + + @Override + protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<T> stateDesc) throws Exception { + File dbPath = getDbPath(stateDesc.getName()); + String checkpointPath = getCheckpointPath(stateDesc.getName()); + return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath); + } + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, + long timestamp) throws Exception { + return backingStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp); + } + + @Override + public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, + long checkpointID, + long timestamp) throws Exception { + return backingStateBackend.checkpointStateSerializable(state, checkpointID, timestamp); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java new file mode 100644 index 0000000..8767a86 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.rocksdb.RocksDBException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ValueState} implementation that stores state in RocksDB. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <V> The type of value that the state state stores. + * @param <Backend> The type of the backend that snapshots this key/value state. + */ +public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend> + extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> + implements ValueState<V> { + + /** Serializer for the values */ + private final TypeSerializer<V> valueSerializer; + + /** This holds the name of the state and can create an initial default value for the state. */ + protected final ValueStateDescriptor<V> stateDesc; + + /** + * Creates a new {@code RocksDBReducingState}. + * + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param dbPath The path on the local system where RocksDB data should be stored. + */ + protected RocksDBValueState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<V> stateDesc, + File dbPath, + String backupPath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath); + this.stateDesc = requireNonNull(stateDesc); + this.valueSerializer = stateDesc.getSerializer(); + } + + protected RocksDBValueState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath); + this.stateDesc = stateDesc; + this.valueSerializer = stateDesc.getSerializer(); + } + + @Override + public V value() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + try { + writeKeyAndNamespace(out); + byte[] key = baos.toByteArray(); + byte[] valueBytes = db.get(key); + if (valueBytes == null) { + return stateDesc.getDefaultValue(); + } + return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); + } catch (IOException|RocksDBException e) { + throw new RuntimeException("Error while retrieving data from RocksDB", e); + } + } + + @Override + public void update(V value) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + try { + writeKeyAndNamespace(out); + byte[] key = baos.toByteArray(); + baos.reset(); + valueSerializer.serialize(value, out); + db.put(key, baos.toByteArray()); + } catch (Exception e) { + throw new RuntimeException("Error while adding data to RocksDB", e); + } + } + + @Override + protected KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> createRocksDBSnapshot( + URI backupUri, + long checkpointId) { + return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); + } + + private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> { + private static final long serialVersionUID = 1L; + + public Snapshot(File dbPath, + String checkpointPath, + URI backupUri, + long checkpointId, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<V> stateDesc) { + super(dbPath, + checkpointPath, + backupUri, + checkpointId, + keySerializer, + namespaceSerializer, + stateDesc); + } + + @Override + protected KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> createRocksDBState( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<V> stateDesc, + File dbPath, + String backupPath, + String restorePath) throws Exception { + return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath); + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java new file mode 100644 index 0000000..3b3ac31 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.state.StateBackendTestBase; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +/** + * Tests for the partitioned state part of {@link RocksDBStateBackend}. + */ +public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBackend> { + + private File dbDir; + private File chkDir; + + @Override + protected RocksDBStateBackend getStateBackend() throws IOException { + dbDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); + chkDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); + + return new RocksDBStateBackend(dbDir.getAbsolutePath(), "file://" + chkDir.getAbsolutePath(), new MemoryStateBackend()); + } + + @Override + protected void cleanup() { + try { + FileUtils.deleteDirectory(dbDir); + FileUtils.deleteDirectory(chkDir); + } catch (IOException ignore) {} + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..0b686e5 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties new file mode 100644 index 0000000..ed2bbcb --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This file ensures that tests executed from the IDE show log output + +log4j.rootLogger=OFF, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target = System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml new file mode 100644 index 0000000..4f56748 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/pom.xml b/flink-contrib/pom.xml index 82b6211..76f0f88 100644 --- a/flink-contrib/pom.xml +++ b/flink-contrib/pom.xml @@ -43,5 +43,6 @@ under the License. <module>flink-tweet-inputformat</module> <module>flink-operator-stats</module> <module>flink-connector-wikiedits</module> + <module>flink-statebackend-rocksdb</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java b/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java new file mode 100644 index 0000000..8e4725c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Utility class for running a class in an external process. This will try to find the java + * executable in common places and will use the classpath of the current process as the classpath + * of the new process. + * + * <p>Attention: The entry point class must be in the classpath of the currently running process, + * otherwise the newly spawned process will not find it and fail. + */ +public class ExternalProcessRunner { + private final String entryPointClassName; + + private final Process process; + + final StringWriter errorOutput = new StringWriter(); + + /** + * Creates a new {@code ProcessRunner} that runs the given class with the given parameters. + * The class must have a "main" method. + */ + public ExternalProcessRunner(String entryPointClassName, String[] parameters) throws IOException { + this.entryPointClassName = entryPointClassName; + + String javaCommand = getJavaCommandPath(); + + List<String> commandList = new ArrayList<>(); + + commandList.add(javaCommand); + commandList.add("-classpath"); + commandList.add(getCurrentClasspath()); + commandList.add(entryPointClassName); + + Collections.addAll(commandList, parameters); + + process = new ProcessBuilder(commandList).start(); + + new PipeForwarder(process.getErrorStream(), errorOutput); + } + + /** + * Get the stderr stream of the process. + */ + public StringWriter getErrorOutput() { + return errorOutput; + } + + /** + * Start the external process, wait for it to finish and return the exit code of that process. + * + * <p>If this method is interrupted it will destroy the external process and forward the + * {@code InterruptedException}. + */ + public int run() throws Exception { + try { + int returnCode = process.waitFor(); + + if (returnCode != 0) { + // determine whether we failed because of a ClassNotFoundException and forward that + if (getErrorOutput().toString().contains("Error: Could not find or load main class " + entryPointClassName)) { + throw new ClassNotFoundException("Error: Could not find or load main class " + entryPointClassName); + } + + } + return returnCode; + } catch (InterruptedException e) { + try { + Class<?> processClass = process.getClass(); + Method destroyForcibly = processClass.getMethod("destroyForcibly"); + destroyForcibly.setAccessible(true); + destroyForcibly.invoke(process); + } catch (NoSuchMethodException ex) { + // we don't have destroyForcibly + process.destroy(); + } + throw new InterruptedException("Interrupted while waiting for external process."); + } + } + + /** + * Tries to get the java executable command with which the current JVM was started. + * Returns null, if the command could not be found. + * + * @return The java executable command. + */ + public static String getJavaCommandPath() { + + try { + ProcessBuilder bld = new ProcessBuilder("java", "-version"); + Process process = bld.start(); + if (process.waitFor() == 0) { + return "java"; + } + } + catch (Throwable t) { + // ignore and try the second path + } + + try { + ProcessBuilder bld = new ProcessBuilder("java.exe", "-version"); + Process process = bld.start(); + if (process.waitFor() == 0) { + return "java.exe"; + } + } + catch (Throwable t) { + // ignore and try the second path + } + + File javaHome = new File(System.getProperty("java.home")); + + String path1 = new File(javaHome, "java").getAbsolutePath(); + String path2 = new File(new File(javaHome, "bin"), "java").getAbsolutePath(); + + try { + ProcessBuilder bld = new ProcessBuilder(path1, "-version"); + Process process = bld.start(); + if (process.waitFor() == 0) { + return path1; + } + } + catch (Throwable t) { + // ignore and try the second path + } + + try { + ProcessBuilder bld = new ProcessBuilder(path2, "-version"); + Process process = bld.start(); + if (process.waitFor() == 0) { + return path2; + } + } + catch (Throwable tt) { + // no luck + } + + String path3 = new File(javaHome, "java.exe").getAbsolutePath(); + String path4 = new File(new File(javaHome, "bin"), "java.exe").getAbsolutePath(); + + try { + ProcessBuilder bld = new ProcessBuilder(path3, "-version"); + Process process = bld.start(); + if (process.waitFor() == 0) { + return path3; + } + } + catch (Throwable t) { + // ignore and try the second path + } + + try { + ProcessBuilder bld = new ProcessBuilder(path4, "-version"); + Process process = bld.start(); + if (process.waitFor() == 0) { + return path4; + } + } + catch (Throwable tt) { + // no luck + } + return null; + } + + /** + * Gets the classpath with which the current JVM was started. + * + * @return The classpath with which the current JVM was started. + */ + public static String getCurrentClasspath() { + RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean(); + return bean.getClassPath(); + } + + /** + * Utility class to read the output of a process stream and forward it into a StringWriter. + */ + public static class PipeForwarder extends Thread { + + private final StringWriter target; + private final InputStream source; + + public PipeForwarder(InputStream source, StringWriter target) { + super("Pipe Forwarder"); + setDaemon(true); + + this.source = source; + this.target = target; + + start(); + } + + @Override + public void run() { + try { + int next; + while ((next = source.read()) != -1) { + target.write(next); + } + } + catch (IOException e) { + // terminate + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java new file mode 100644 index 0000000..cf6780b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.net.URI; + +/** + * Utility for copying from local file system to a HDFS {@link FileSystem} in an external process. + * This is required since {@code FileSystem.copyFromLocalFile} does not like being interrupted. + */ +public class HDFSCopyFromLocal { + public static void main(String[] args) throws Exception { + String localBackupPath = args[0]; + String backupUri = args[1]; + + FileSystem fs = FileSystem.get(new URI(backupUri), new Configuration()); + + fs.copyFromLocalFile(new Path(localBackupPath), new Path(backupUri)); + } + + public static void copyFromLocal(File localPath, URI remotePath) throws Exception { + ExternalProcessRunner processRunner = new ExternalProcessRunner(HDFSCopyFromLocal.class.getName(), + new String[]{localPath.getAbsolutePath(), remotePath.toString()}); + if (processRunner.run() != 0) { + throw new RuntimeException("Error while copying to remote FileSystem: " + processRunner.getErrorOutput()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java new file mode 100644 index 0000000..813f768 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.net.URI; + +/** + * Utility for copying from a HDFS {@link FileSystem} to the local file system in an external + * process. This is required since {@code FileSystem.copyToLocalFile} does not like being + * interrupted. + */ +public class HDFSCopyToLocal { + public static void main(String[] args) throws Exception { + String backupUri = args[0]; + String dbPath = args[1]; + + FileSystem fs = FileSystem.get(new URI(backupUri), new Configuration()); + + fs.copyToLocalFile(new Path(backupUri), new Path(dbPath)); + } + + public static void copyToLocal(URI remotePath, File localPath) throws Exception { + ExternalProcessRunner processRunner = new ExternalProcessRunner(HDFSCopyToLocal.class.getName(), + new String[]{remotePath.toString(), localPath.getAbsolutePath()}); + if (processRunner.run() != 0) { + throw new RuntimeException("Error while copying from remote FileSystem: " + processRunner.getErrorOutput()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java b/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java new file mode 100644 index 0000000..5ebe772 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ExternalProcessRunnerTest { + + @Test(expected = ClassNotFoundException.class) + public void testClassNotFound() throws Exception { + ExternalProcessRunner runner = new ExternalProcessRunner("MyClassThatDoesNotExist", new String[]{}); + runner.run(); + } + + @Test + public void testInterrupting() throws Exception { + + final ExternalProcessRunner runner = new ExternalProcessRunner(InfiniteLoop.class.getName(), new String[]{}); + + Thread thread = new Thread() { + @Override + public void run() { + try { + runner.run(); + } catch (InterruptedException e) { + // this is expected + } catch (Exception e) { + fail("Other exception received " + e); + } + } + }; + + thread.start(); + thread.interrupt(); + thread.join(); + } + + @Test + public void testPrintToErr() throws Exception { + final ExternalProcessRunner runner = new ExternalProcessRunner(PrintToError.class.getName(), new String[]{"hello42"}); + + int result = runner.run(); + + assertEquals(0, result); + assertEquals(runner.getErrorOutput().toString(), "Hello process hello42\n"); + } + + @Test + public void testFailing() throws Exception { + final ExternalProcessRunner runner = new ExternalProcessRunner(Failing.class.getName(), new String[]{}); + + int result = runner.run(); + + assertEquals(1, result); + // this needs to be adapted if the test changes because it contains the line number + assertEquals(runner.getErrorOutput().toString(), "Exception in thread \"main\" java.lang.RuntimeException: HEHE, I'm failing.\n" + + "\tat org.apache.flink.util.ExternalProcessRunnerTest$Failing.main(ExternalProcessRunnerTest.java:94)\n"); + } + + + public static class InfiniteLoop { + public static void main(String[] args) { + while (true) { + } + } + } + + public static class PrintToError { + public static void main(String[] args) { + System.err.println("Hello process " + args[0]); + } + } + + public static class Failing { + public static void main(String[] args) { + throw new RuntimeException("HEHE, I'm failing."); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 996f5e0..73a3d66 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -175,6 +175,13 @@ under the License. <version>${guava.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-statebackend-rocksdb_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 5886982..9bc0040 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -112,6 +113,12 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { String backups = tempFolder.newFolder().getAbsolutePath(); this.stateBackend = new FsStateBackend("file://" + backups); break; + case ROCKSDB: + String rocksDb = tempFolder.newFolder().getAbsolutePath(); + String rocksDbBackups = tempFolder.newFolder().getAbsolutePath(); + + this.stateBackend = new RocksDBStateBackend(rocksDb, "file://" + rocksDbBackups, new MemoryStateBackend()); + break; } } @@ -739,6 +746,8 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { return Arrays.asList(new Object[][] { {StateBackendEnum.MEM}, {StateBackendEnum.FILE}, +// {StateBackendEnum.DB}, + {StateBackendEnum.ROCKSDB} } ); }
