http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java deleted file mode 100644 index 0086ac6..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; - -import java.io.Serializable; - -/** - * State handler provider factory. - * - * <p>This is going to be superseded soon. - */ -public class StateHandleProviderFactory { - - /** - * Creates a {@link org.apache.flink.runtime.state.FileStateHandle.FileStateHandleProvider} at - * the configured recovery path. - */ - public static <T extends Serializable> StateHandleProvider<T> createRecoveryFileStateHandleProvider( - Configuration config) { - - StateBackend stateBackend = StateBackend.fromConfig(config); - - if (stateBackend == StateBackend.FILESYSTEM) { - String recoveryPath = config.getString( - ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""); - - if (recoveryPath.equals("")) { - throw new IllegalConfigurationException("Missing recovery path. Specify via " + - "configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'."); - } - else { - return FileStateHandle.createProvider(recoveryPath); - } - } - else { - throw new IllegalConfigurationException("Unexpected state backend configuration " + - stateBackend); - } - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java new file mode 100644 index 0000000..32c601e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.StateHandle; + +import java.io.InputStream; + +/** + * A state handle that produces an input stream when resolved. + */ +public interface StreamStateHandle extends StateHandle<InputStream> {} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java new file mode 100644 index 0000000..d64e2c4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import java.io.IOException; + +/** + * Base class for state that is stored in a file. + */ +public abstract class AbstractFileState implements java.io.Serializable { + + private static final long serialVersionUID = 350284443258002355L; + + /** The path to the file in the filesystem, fully describing the file system */ + private final Path filePath; + + /** Cached file system handle */ + private transient FileSystem fs; + + /** + * Creates a new file state for the given file path. + * + * @param filePath The path to the file that stores the state. + */ + protected AbstractFileState(Path filePath) { + this.filePath = filePath; + } + + /** + * Gets the path where this handle's state is stored. + * @return The path where this handle's state is stored. + */ + public Path getFilePath() { + return filePath; + } + + /** + * Discard the state by deleting the file that stores the state. If the parent directory + * of the state is empty after deleting the state file, it is also deleted. + * + * @throws Exception Thrown, if the file deletion (not the directory deletion) fails. + */ + public void discardState() throws Exception { + getFileSystem().delete(filePath, false); + + // send a call to delete the directory containing the file. this will + // fail (and be ignored) when some files still exist + try { + getFileSystem().delete(filePath.getParent(), false); + } catch (IOException ignored) {} + } + + /** + * Gets the file system that stores the file state. + * @return The file system that stores the file state. + * @throws IOException Thrown if the file system cannot be accessed. + */ + protected FileSystem getFileSystem() throws IOException { + if (fs == null) { + fs = FileSystem.get(filePath.toUri()); + } + return fs; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java new file mode 100644 index 0000000..b7e7cd1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.util.InstantiationUtil; + +import java.io.ObjectInputStream; + +/** + * A state handle that points to state stored in a file via Java Serialization. + * + * @param <T> The type of state pointed to by the state handle. + */ +public class FileSerializableStateHandle<T> extends AbstractFileState implements StateHandle<T> { + + private static final long serialVersionUID = -657631394290213622L; + + /** + * Creates a new FileSerializableStateHandle pointing to state at the given file path. + * + * @param filePath The path to the file containing the checkpointed state. + */ + public FileSerializableStateHandle(Path filePath) { + super(filePath); + } + + @Override + @SuppressWarnings("unchecked") + public T getState(ClassLoader classLoader) throws Exception { + FSDataInputStream inStream = getFileSystem().open(getFilePath()); + ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader); + return (T) ois.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java new file mode 100644 index 0000000..f4681ea --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.InputStream; + +/** + * A state handle that points to state in a file system, accessible as an input stream. + */ +public class FileStreamStateHandle extends AbstractFileState implements StreamStateHandle { + + private static final long serialVersionUID = -6826990484549987311L; + + /** + * Creates a new FileStreamStateHandle pointing to state at the given file path. + * + * @param filePath The path to the file containing the checkpointed state. + */ + public FileStreamStateHandle(Path filePath) { + super(filePath); + } + + @Override + public InputStream getState(ClassLoader userCodeClassLoader) throws Exception { + return getFileSystem().open(getFilePath()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java new file mode 100644 index 0000000..e3116dd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; +import org.apache.flink.runtime.state.AbstractHeapKvState; + +import java.io.DataOutputStream; +import java.util.HashMap; + +/** + * Heap-backed key/value state that is snapshotted into files. + * + * @param <K> The type of the key. + * @param <V> The type of the value. + */ +public class FsHeapKvState<K, V> extends AbstractHeapKvState<K, V, FsStateBackend> { + + /** The file system state backend backing snapshots of this state */ + private final FsStateBackend backend; + + /** + * Creates a new and empty key/value state. + * + * @param keySerializer The serializer for the key. + * @param valueSerializer The serializer for the value. + * @param defaultValue The value that is returned when no other value has been associated with a key, yet. + * @param backend The file system state backend backing snapshots of this state + */ + public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, + V defaultValue, FsStateBackend backend) { + super(keySerializer, valueSerializer, defaultValue); + this.backend = backend; + } + + /** + * Creates a new key/value state with the given state contents. + * This method is used to re-create key/value state with existing data, for example from + * a snapshot. + * + * @param keySerializer The serializer for the key. + * @param valueSerializer The serializer for the value. + * @param defaultValue The value that is returned when no other value has been associated with a key, yet. + * @param state The map of key/value pairs to initialize the state with. + * @param backend The file system state backend backing snapshots of this state + */ + public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, + V defaultValue, HashMap<K, V> state, FsStateBackend backend) { + super(keySerializer, valueSerializer, defaultValue, state); + this.backend = backend; + } + + + @Override + public FsHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception { + // first, create an output stream to write to + try (FsStateBackend.FsCheckpointStateOutputStream out = + backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { + + // serialize the state to the output stream + OutputViewDataOutputStreamWrapper outView = + new OutputViewDataOutputStreamWrapper(new DataOutputStream(out)); + outView.writeInt(size()); + writeStateToOutputView(outView); + outView.flush(); + + // create a handle to the state + return new FsHeapKvStateSnapshot<>(getKeySerializer(), getValueSerializer(), out.closeAndGetPath()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java new file mode 100644 index 0000000..781ee3d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.runtime.state.KvStateSnapshot; + +import java.io.DataInputStream; +import java.util.HashMap; + +/** + * A snapshot of a heap key/value state stored in a file. + * + * @param <K> The type of the key in the snapshot state. + * @param <V> The type of the value in the snapshot state. + */ +public class FsHeapKvStateSnapshot<K, V> extends AbstractFileState implements KvStateSnapshot<K, V, FsStateBackend> { + + private static final long serialVersionUID = 1L; + + /** Name of the key serializer class */ + private final String keySerializerClassName; + + /** Name of the value serializer class */ + private final String valueSerializerClassName; + + /** + * Creates a new state snapshot with data in the file system. + * + * @param keySerializer The serializer for the keys. + * @param valueSerializer The serializer for the values. + * @param filePath The path where the snapshot data is stored. + */ + public FsHeapKvStateSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, Path filePath) { + super(filePath); + this.keySerializerClassName = keySerializer.getClass().getName(); + this.valueSerializerClassName = valueSerializer.getClass().getName(); + } + + @Override + public FsHeapKvState<K, V> restoreState( + FsStateBackend stateBackend, + final TypeSerializer<K> keySerializer, + final TypeSerializer<V> valueSerializer, + V defaultValue, + ClassLoader classLoader) throws Exception { + + // validity checks + if (!keySerializer.getClass().getName().equals(keySerializerClassName) || + !valueSerializer.getClass().getName().equals(valueSerializerClassName)) { + throw new IllegalArgumentException( + "Cannot restore the state from the snapshot with the given serializers. " + + "State (K/V) was serialized with (" + valueSerializerClassName + + "/" + keySerializerClassName + ")"); + } + + // state restore + try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) { + InputViewDataInputStreamWrapper inView = new InputViewDataInputStreamWrapper(new DataInputStream(inStream)); + + final int numEntries = inView.readInt(); + HashMap<K, V> stateMap = new HashMap<>(numEntries); + + for (int i = 0; i < numEntries; i++) { + K key = keySerializer.deserialize(inView); + V value = valueSerializer.deserialize(inView); + stateMap.put(key, value); + } + + return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap, stateBackend); + } + catch (Exception e) { + throw new Exception("Failed to restore state from file system", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java new file mode 100644 index 0000000..045c411 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateBackend; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.UUID; + +/** + * The file state backend is a state backend that stores the state of streaming jobs in a file system. + * + * <p>The state backend has one core directory into which it puts all checkpoint data. Inside that + * directory, it creates a directory per job, inside which each checkpoint gets a directory, with + * files for each state, for example: + * + * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 } + */ +public class FsStateBackend extends StateBackend<FsStateBackend> { + + private static final long serialVersionUID = -8191916350224044011L; + + private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class); + + + /** The path to the directory for the checkpoint data, including the file system + * description via scheme and optional authority */ + private final Path basePath; + + /** The directory (job specific) into this initialized instance of the backend stores its data */ + private transient Path checkpointDirectory; + + /** Cached handle to the file system for file operations */ + private transient FileSystem filesystem; + + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to teh checkpoint data directory. + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(String checkpointDataUri) throws IOException { + this(new Path(checkpointDataUri)); + } + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to teh checkpoint data directory. + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(Path checkpointDataUri) throws IOException { + this(checkpointDataUri.toUri()); + } + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to teh checkpoint data directory. + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(URI checkpointDataUri) throws IOException { + final String scheme = checkpointDataUri.getScheme(); + final String path = checkpointDataUri.getPath(); + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI."); + } + if (path == null) { + throw new IllegalArgumentException("The path to store the checkpoint data in is null. " + + "Please specify a directory path for the checkpoint data."); + } + if (path.length() == 0 || path.equals("/")) { + throw new IllegalArgumentException("Cannot use the root directory for checkpoints."); + } + + // we do a bit of work to make sure that the URI for the filesystem refers to exactly the same + // (distributed) filesystem on all hosts and includes full host/port information, even if the + // original URI did not include that. We count on the filesystem loading from the configuration + // to fill in the missing data. + + // try to grab the file system for this path/URI + this.filesystem = FileSystem.get(checkpointDataUri); + if (this.filesystem == null) { + throw new IOException("Could not find a file system for the given scheme in the available configurations."); + } + + URI fsURI = this.filesystem.getUri(); + try { + URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null); + this.basePath = new Path(baseURI); + } + catch (URISyntaxException e) { + throw new IOException( + String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s", + checkpointDataUri, fsURI), e); + } + } + + /** + * Gets the base directory where all state-containing files are stored. + * The job specific directory is created inside this directory. + * + * @return The base directory. + */ + public Path getBasePath() { + return basePath; + } + + /** + * Gets the directory where this state backend stores its checkpoint data. Will be null if + * the state backend has not been initialized. + * + * @return The directory where this state backend stores its checkpoint data. + */ + public Path getCheckpointDirectory() { + return checkpointDirectory; + } + + /** + * Checks whether this state backend is initialized. Note that initialization does not carry + * across serialization. After each serialization, the state backend needs to be initialized. + * + * @return True, if the file state backend has been initialized, false otherwise. + */ + public boolean isInitialized() { + return filesystem != null && checkpointDirectory != null; + } + + /** + * Gets the file system handle for the file system that stores the state for this backend. + * + * @return This backend's file system handle. + */ + public FileSystem getFileSystem() { + if (filesystem != null) { + return filesystem; + } + else { + throw new IllegalStateException("State backend has not been initialized."); + } + } + + // ------------------------------------------------------------------------ + // initialization and cleanup + // ------------------------------------------------------------------------ + + @Override + public void initializeForJob(JobID jobId) throws Exception { + Path dir = new Path(basePath, jobId.toString()); + + LOG.info("Initializing file state backend to URI " + dir); + + filesystem = basePath.getFileSystem(); + filesystem.mkdirs(dir); + + checkpointDirectory = dir; + } + + @Override + public void disposeAllStateForCurrentJob() throws Exception { + FileSystem fs = this.filesystem; + Path dir = this.checkpointDirectory; + + if (fs != null && dir != null) { + this.filesystem = null; + this.checkpointDirectory = null; + fs.delete(dir, true); + } + else { + throw new IllegalStateException("state backend has not been initialized"); + } + } + + @Override + public void close() throws Exception {} + + // ------------------------------------------------------------------------ + // state backend operations + // ------------------------------------------------------------------------ + + @Override + public <K, V> FsHeapKvState<K, V> createKvState( + TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception { + return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, this); + } + + @Override + public <S extends Serializable> StateHandle<S> checkpointStateSerializable( + S state, long checkpointID, long timestamp) throws Exception + { + checkFileSystemInitialized(); + + // make sure the directory for that specific checkpoint exists + final Path checkpointDir = createCheckpointDirPath(checkpointID); + filesystem.mkdirs(checkpointDir); + + + Exception latestException = null; + + for (int attempt = 0; attempt < 10; attempt++) { + Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString()); + FSDataOutputStream outStream; + try { + outStream = filesystem.create(targetPath, false); + } + catch (Exception e) { + latestException = e; + continue; + } + + ObjectOutputStream os = new ObjectOutputStream(outStream); + os.writeObject(state); + os.close(); + return new FileSerializableStateHandle<S>(targetPath); + } + + throw new Exception("Could not open output stream for state backend", latestException); + } + + @Override + public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + checkFileSystemInitialized(); + + final Path checkpointDir = createCheckpointDirPath(checkpointID); + filesystem.mkdirs(checkpointDir); + + Exception latestException = null; + + for (int attempt = 0; attempt < 10; attempt++) { + Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString()); + try { + FSDataOutputStream outStream = filesystem.create(targetPath, false); + return new FsCheckpointStateOutputStream(outStream, targetPath, filesystem); + } + catch (Exception e) { + latestException = e; + } + } + throw new Exception("Could not open output stream for state backend", latestException); + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private void checkFileSystemInitialized() throws IllegalStateException { + if (filesystem == null || checkpointDirectory == null) { + throw new IllegalStateException("filesystem has not been re-initialized after deserialization"); + } + } + + private Path createCheckpointDirPath(long checkpointID) { + return new Path(checkpointDirectory, "chk-" + checkpointID); + } + + @Override + public String toString() { + return checkpointDirectory == null ? + "File State Backend @ " + basePath : + "File State Backend (initialized) @ " + checkpointDirectory; + } + + // ------------------------------------------------------------------------ + // Output stream for state checkpointing + // ------------------------------------------------------------------------ + + /** + * A CheckpointStateOutputStream that writes into a file and returns the path to that file upon + * closing. + */ + public static final class FsCheckpointStateOutputStream extends CheckpointStateOutputStream { + + private final FSDataOutputStream outStream; + + private final Path filePath; + + private final FileSystem fs; + + private boolean closed; + + FsCheckpointStateOutputStream(FSDataOutputStream outStream, Path filePath, FileSystem fs) { + this.outStream = outStream; + this.filePath = filePath; + this.fs = fs; + } + + + @Override + public void write(int b) throws IOException { + outStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outStream.write(b, off, len); + } + + @Override + public void flush() throws IOException { + outStream.flush(); + } + + /** + * If the stream is only closed, we remove the produced file (cleanup through the auto close + * feature, for example). This method throws no exception if the deletion fails, but only + * logs the error. + */ + @Override + public void close() { + synchronized (this) { + if (!closed) { + closed = true; + try { + outStream.close(); + fs.delete(filePath, false); + + // attempt to delete the parent (will fail and be ignored if the parent has more files) + try { + fs.delete(filePath.getParent(), false); + } catch (IOException ignored) {} + } + catch (Exception e) { + LOG.warn("Cannot delete closed and discarded state stream to " + filePath, e); + } + } + } + } + + @Override + public FileStreamStateHandle closeAndGetHandle() throws IOException { + return new FileStreamStateHandle(closeAndGetPath()); + } + + /** + * Closes the stream and returns the path to the file that contains the stream's data. + * @return The path to the file that contains the stream's data. + * @throws IOException Thrown if the stream cannot be successfully closed. + */ + public Path closeAndGetPath() throws IOException { + synchronized (this) { + if (!closed) { + closed = true; + outStream.close(); + return filePath; + } + else { + throw new IOException("Stream has already been closed and discarded."); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java new file mode 100644 index 0000000..e687f7f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StateBackendFactory; + +/** + * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend} + * from a configuration. + */ +public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend> { + + /** The key under which the config stores the directory where checkpoints should be stored */ + public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir"; + + + @Override + public FsStateBackend createFromConfig(Configuration config) throws Exception { + String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); + + if (checkpointDirURI == null) { + throw new IllegalConfigurationException( + "Cannot create the file system state backend: The configuration does not specify the " + + "checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\''); + } + + try { + Path path = new Path(checkpointDirURI); + return new FsStateBackend(path); + } + catch (IllegalArgumentException e) { + throw new Exception("Cannot initialize File System State Backend with URI '" + + checkpointDirURI + '.', e); + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java new file mode 100644 index 0000000..29762f7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.memory; + +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; + +/** + * A state handle that contains stream state in a byte array. + */ +public final class ByteStreamStateHandle implements StreamStateHandle { + + private static final long serialVersionUID = -5280226231200217594L; + + /** the state data */ + private final byte[] data; + + /** + * Creates a new ByteStreamStateHandle containing the given data. + * + * @param data The state data. + */ + public ByteStreamStateHandle(byte[] data) { + this.data = data; + } + + @Override + public InputStream getState(ClassLoader userCodeClassLoader) { + return new ByteArrayInputStream(data); + } + + @Override + public void discardState() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java new file mode 100644 index 0000000..96cb440 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.memory; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.runtime.state.AbstractHeapKvState; + +import java.util.HashMap; + +/** + * Heap-backed key/value state that is snapshotted into a serialized memory copy. + * + * @param <K> The type of the key. + * @param <V> The type of the value. + */ +public class MemHeapKvState<K, V> extends AbstractHeapKvState<K, V, MemoryStateBackend> { + + public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) { + super(keySerializer, valueSerializer, defaultValue); + } + + public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, + V defaultValue, HashMap<K, V> state) { + super(keySerializer, valueSerializer, defaultValue, state); + } + + @Override + public MemoryHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception { + DataOutputSerializer ser = new DataOutputSerializer(Math.max(size() * 16, 16)); + writeStateToOutputView(ser); + byte[] bytes = ser.getCopyOfBuffer(); + + return new MemoryHeapKvStateSnapshot<K, V>(getKeySerializer(), getValueSerializer(), bytes, size()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java new file mode 100644 index 0000000..1b03def --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.memory; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.runtime.state.KvStateSnapshot; + +import java.util.HashMap; + +/** + * A snapshot of a {@link MemHeapKvState} for a checkpoint. The data is stored in a heap byte + * array, in serialized form. + * + * @param <K> The type of the key in the snapshot state. + * @param <V> The type of the value in the snapshot state. + */ +public class MemoryHeapKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, MemoryStateBackend> { + + private static final long serialVersionUID = 1L; + + /** Name of the key serializer class */ + private final String keySerializerClassName; + + /** Name of the value serializer class */ + private final String valueSerializerClassName; + + /** The serialized data of the state key/value pairs */ + private final byte[] data; + + /** The number of key/value pairs */ + private final int numEntries; + + /** + * Creates a new heap memory state snapshot. + * + * @param keySerializer The serializer for the keys. + * @param valueSerializer The serializer for the values. + * @param data The serialized data of the state key/value pairs + * @param numEntries The number of key/value pairs + */ + public MemoryHeapKvStateSnapshot(TypeSerializer<K> keySerializer, + TypeSerializer<V> valueSerializer, byte[] data, int numEntries) { + this.keySerializerClassName = keySerializer.getClass().getName(); + this.valueSerializerClassName = valueSerializer.getClass().getName(); + this.data = data; + this.numEntries = numEntries; + } + + + @Override + public MemHeapKvState<K, V> restoreState( + MemoryStateBackend stateBackend, + final TypeSerializer<K> keySerializer, + final TypeSerializer<V> valueSerializer, + V defaultValue, + ClassLoader classLoader) throws Exception { + + // validity checks + if (!keySerializer.getClass().getName().equals(keySerializerClassName) || + !valueSerializer.getClass().getName().equals(valueSerializerClassName)) { + throw new IllegalArgumentException( + "Cannot restore the state from the snapshot with the given serializers. " + + "State (K/V) was serialized with (" + valueSerializerClassName + + "/" + keySerializerClassName + ")"); + } + + // restore state + HashMap<K, V> stateMap = new HashMap<>(numEntries); + DataInputDeserializer in = new DataInputDeserializer(data, 0, data.length); + + for (int i = 0; i < numEntries; i++) { + K key = keySerializer.deserialize(in); + V value = valueSerializer.deserialize(in); + stateMap.put(key, value); + } + + return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap); + } + + /** + * Discarding the heap state is a no-op. + */ + @Override + public void discardState() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java new file mode 100644 index 0000000..8d297d4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.memory; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * A {@link StateBackend} that stores all its data and checkpoints in memory and has no + * capabilities to spill to disk. Checkpoints are serialized and the serialized data is + * transferred + */ +public class MemoryStateBackend extends StateBackend<MemoryStateBackend> { + + private static final long serialVersionUID = 4109305377809414635L; + + /** The default maximal size that the snapshotted memory state may have (5 MiBytes) */ + private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024; + + /** The maximal size that the snapshotted memory state may have */ + private final int maxStateSize; + + /** + * Creates a new memory state backend that accepts states whose serialized forms are + * up to the default state size (5 MB). + */ + public MemoryStateBackend() { + this(DEFAULT_MAX_STATE_SIZE); + } + + /** + * Creates a new memory state backend that accepts states whose serialized forms are + * up to the given number of bytes. + * + * @param maxStateSize The maximal size of the serialized state + */ + public MemoryStateBackend(int maxStateSize) { + this.maxStateSize = maxStateSize; + } + + // ------------------------------------------------------------------------ + // initialization and cleanup + // ------------------------------------------------------------------------ + + @Override + public void initializeForJob(JobID job) { + // nothing to do here + } + + @Override + public void disposeAllStateForCurrentJob() { + // nothing to do here, GC will do it + } + + @Override + public void close() throws Exception {} + + // ------------------------------------------------------------------------ + // State backend operations + // ------------------------------------------------------------------------ + + @Override + public <K, V> MemHeapKvState<K, V> createKvState( + TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) { + return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue); + } + + /** + * Serialized the given state into bytes using Java serialization and creates a state handle that + * can re-create that state. + * + * @param state The state to checkpoint. + * @param checkpointID The ID of the checkpoint. + * @param timestamp The timestamp of the checkpoint. + * @param <S> The type of the state. + * + * @return A state handle that contains the given state serialized as bytes. + * @throws Exception Thrown, if the serialization fails. + */ + @Override + public <S extends Serializable> StateHandle<S> checkpointStateSerializable( + S state, long checkpointID, long timestamp) throws Exception + { + SerializedStateHandle<S> handle = new SerializedStateHandle<>(state); + checkSize(handle.getSizeOfSerializedState(), maxStateSize); + return new SerializedStateHandle<S>(state); + } + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream( + long checkpointID, long timestamp) throws Exception + { + return new MemoryCheckpointOutputStream(maxStateSize); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)"; + } + + static void checkSize(int size, int maxSize) throws IOException { + if (size > maxSize) { + throw new IOException( + "Size of the state is larger than the maximum permitted memory-backed state. Size=" + + size + " , maxSize=" + maxSize + + " . Consider using a different state backend, like the File System State backend."); + } + } + + // ------------------------------------------------------------------------ + + /** + * A CheckpointStateOutputStream that writes into a byte array. + */ + public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream { + + private final ByteArrayOutputStream os = new ByteArrayOutputStream(); + + private final int maxSize; + + private boolean closed; + + public MemoryCheckpointOutputStream(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public void write(int b) { + os.write(b); + } + + @Override + public void write(byte[] b, int off, int len) { + os.write(b, off, len); + } + + // -------------------------------------------------------------------- + + @Override + public void close() { + closed = true; + os.reset(); + } + + @Override + public StreamStateHandle closeAndGetHandle() throws IOException { + return new ByteStreamStateHandle(closeAndGetBytes()); + } + + /** + * Closes the stream and returns the byte array containing the stream's data. + * @return The byte array containing the stream's data. + * @throws IOException Thrown if the size of the data exceeds the maximal + */ + public byte[] closeAndGetBytes() throws IOException { + if (!closed) { + checkSize(os.size(), maxSize); + byte[] bytes = os.toByteArray(); + close(); + return bytes; + } + else { + throw new IllegalStateException("stream has already been closed"); + } + } + } + + // ------------------------------------------------------------------------ + // Static default instance + // ------------------------------------------------------------------------ + + /** The default instance of this state backend, using the default maximal state size */ + private static final MemoryStateBackend DEFAULT_INSTANCE = new MemoryStateBackend(); + + /** + * Gets the default instance of this state backend, using the default maximal state size. + * @return The default instance of this state backend. + */ + public static MemoryStateBackend defaultInstance() { + return DEFAULT_INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java new file mode 100644 index 0000000..c488dc9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.memory; + +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.util.SerializedValue; + +import java.io.IOException; + +/** + * A state handle that represents its state in serialized form as bytes. + * + * @param <T> The type of state represented by this state handle. + */ +public class SerializedStateHandle<T> extends SerializedValue<T> implements StateHandle<T> { + + private static final long serialVersionUID = 4145685722538475769L; + + public SerializedStateHandle(T value) throws IOException { + super(value); + } + + @Override + public T getState(ClassLoader classLoader) throws Exception { + return deserializeValue(classLoader); + } + + /** + * Discarding heap-memory backed state is a no-op, so this method does nothing. + */ + @Override + public void discardState() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 79b9b7e..a32fc65 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -34,11 +34,14 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService; -import org.apache.flink.runtime.state.StateHandleProvider; -import org.apache.flink.runtime.state.StateHandleProviderFactory; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; + import static com.google.common.base.Preconditions.checkNotNull; public class ZooKeeperUtils { @@ -170,7 +173,7 @@ public class ZooKeeperUtils { String latchPath = configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH, ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH); String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH); + ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH); return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath); } @@ -188,8 +191,7 @@ public class ZooKeeperUtils { checkNotNull(configuration, "Configuration"); - StateHandleProvider<SubmittedJobGraph> stateHandleProvider = - StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration); + StateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph"); // ZooKeeper submitted jobs root dir String zooKeeperSubmittedJobsPath = configuration.getString( @@ -197,7 +199,7 @@ public class ZooKeeperUtils { ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); return new ZooKeeperSubmittedJobGraphStore( - client, zooKeeperSubmittedJobsPath, stateHandleProvider); + client, zooKeeperSubmittedJobsPath, stateStorage); } /** @@ -219,21 +221,23 @@ public class ZooKeeperUtils { checkNotNull(configuration, "Configuration"); - StateHandleProvider<CompletedCheckpoint> stateHandleProvider = - StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration); + String checkpointsPath = configuration.getString( + ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH); - String completedCheckpointsPath = configuration.getString( - ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH); - completedCheckpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId); + StateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage( + configuration, + "completedCheckpoint"); + + checkpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId); return new ZooKeeperCompletedCheckpointStore( maxNumberOfCheckpointsToRetain, userClassLoader, client, - completedCheckpointsPath, - stateHandleProvider); + checkpointsPath, + stateStorage); } /** @@ -259,6 +263,30 @@ public class ZooKeeperUtils { } /** + * Creates a {@link FileSystemStateStorageHelper} instance. + * + * @param configuration {@link Configuration} object + * @param prefix Prefix for the created files + * @param <T> Type of the state objects + * @return {@link FileSystemStateStorageHelper} instance + * @throws IOException + */ + private static <T extends Serializable> FileSystemStateStorageHelper<T> createFileSystemStateStorage( + Configuration configuration, + String prefix) throws IOException { + + String rootPath = configuration.getString( + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""); + + if (rootPath.equals("")) { + throw new IllegalConfigurationException("Missing recovery path. Specify via " + + "configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'."); + } else { + return new FileSystemStateStorageHelper<T>(rootPath, prefix); + } + } + + /** * Private constructor to prevent instantiation. */ private ZooKeeperUtils() { http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java new file mode 100644 index 0000000..d18cace --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.flink.runtime.state.StateHandle; + +import java.io.Serializable; + +/** + * State storage helper which is used by {@ZooKeeperStateHandleStore} to persiste state before + * the state handle is written to ZooKeeper. + * + * @param <T> + */ +public interface StateStorageHelper<T extends Serializable> { + + /** + * Stores the given state and returns a state handle to it. + * + * @param state State to be stored + * @return State handle to the stored state + * @throws Exception + */ + StateHandle<T> store(T state) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java index 936fe1b..6073a39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -23,12 +23,14 @@ import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.utils.ZKPaths; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.state.StateHandleProvider; import org.apache.flink.util.InstantiationUtil; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -65,11 +67,12 @@ import static com.google.common.base.Preconditions.checkNotNull; */ public class ZooKeeperStateHandleStore<T extends Serializable> { + public static Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class); + /** Curator ZooKeeper client */ private final CuratorFramework client; - /** State handle provider */ - private final StateHandleProvider<T> stateHandleProvider; + private final StateStorageHelper<T> storage; /** * Creates a {@link ZooKeeperStateHandleStore}. @@ -78,14 +81,13 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { * expected that the client's namespace ensures that the root * path is exclusive for all state handles managed by this * instance, e.g. <code>client.usingNamespace("/stateHandles")</code> - * @param stateHandleProvider The state handle provider for the state */ public ZooKeeperStateHandleStore( - CuratorFramework client, - StateHandleProvider<T> stateHandleProvider) { + CuratorFramework client, + StateStorageHelper storage) throws IOException { this.client = checkNotNull(client, "Curator client"); - this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + this.storage = checkNotNull(storage, "State storage"); } /** @@ -112,12 +114,14 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { * @return Created {@link StateHandle} * @throws Exception If a ZooKeeper or state handle operation fails */ - public StateHandle<T> add(String pathInZooKeeper, T state, CreateMode createMode) throws Exception { + public StateHandle<T> add( + String pathInZooKeeper, + T state, + CreateMode createMode) throws Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - // Create the state handle. Nothing persisted yet. - StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state); + StateHandle<T> stateHandle = storage.store(state); boolean success = false; @@ -159,7 +163,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { StateHandle<T> oldStateHandle = get(pathInZooKeeper); - StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state); + StateHandle<T> stateHandle = storage.store(state); boolean success = false; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java new file mode 100644 index 0000000..d6b69e4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper.filesystem; + +import com.google.common.base.Preconditions; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle; +import org.apache.flink.runtime.util.FileUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * {@link StateStorageHelper} implementation which stores the state in the given filesystem path. + * + * @param <T> + */ +public class FileSystemStateStorageHelper<T extends Serializable> implements StateStorageHelper<T> { + + private final Path rootPath; + + private final String prefix; + + private final FileSystem fs; + + public FileSystemStateStorageHelper(String rootPath, String prefix) throws IOException { + this(new Path(rootPath), prefix); + } + + public FileSystemStateStorageHelper(Path rootPath, String prefix) throws IOException { + this.rootPath = Preconditions.checkNotNull(rootPath, "Root path"); + this.prefix = Preconditions.checkNotNull(prefix, "Prefix"); + + fs = FileSystem.get(rootPath.toUri()); + } + + @Override + public StateHandle<T> store(T state) throws Exception { + Exception latestException = null; + + for (int attempt = 0; attempt < 10; attempt++) { + Path filePath = getNewFilePath(); + FSDataOutputStream outStream; + try { + outStream = fs.create(filePath, false); + } + catch (Exception e) { + latestException = e; + continue; + } + + try(ObjectOutputStream os = new ObjectOutputStream(outStream)) { + os.writeObject(state); + } + + return new FileSerializableStateHandle<>(filePath); + } + + throw new Exception("Could not open output stream for state backend", latestException); + } + + private Path getNewFilePath() { + return new Path(rootPath, FileUtils.getRandomFilename(prefix)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index ebc0ea9..d9b69ad 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1542,30 +1542,25 @@ object JobManager { } } - val webMonitor: Option[WebMonitor] = - if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { - val address = AkkaUtils.getAddress(jobManagerSystem) + val address = AkkaUtils.getAddress(jobManagerSystem) - configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get) - configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get) + configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get) + configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get) - // start the job manager web frontend - if (configuration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) { - val leaderRetrievalService = LeaderRetrievalUtils - .createLeaderRetrievalService(configuration) + val webMonitor: Option[WebMonitor] = + if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { + LOG.info("Starting JobManger web frontend") + val leaderRetrievalService = LeaderRetrievalUtils + .createLeaderRetrievalService(configuration) - LOG.info("Starting NEW JobManger web frontend") - // start the new web frontend. we need to load this dynamically - // because it is not in the same project/dependencies - Some(startWebRuntimeMonitor(configuration, leaderRetrievalService, jobManagerSystem)) - } - else { - LOG.info("Starting JobManger web frontend") + // start the web frontend. we need to load this dynamically + // because it is not in the same project/dependencies + val webServer = WebMonitorUtils.startWebRuntimeMonitor( + configuration, + leaderRetrievalService, + jobManagerSystem) - // The old web frontend does not work with recovery mode - val leaderRetrievalService = StandaloneUtils.createLeaderRetrievalService(configuration) - Some(new WebInfoServer(configuration, leaderRetrievalService, jobManagerSystem)) - } + Option(webServer) } else { None @@ -1624,16 +1619,8 @@ object JobManager { monitor => val jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(configuration) monitor.start(jobManagerAkkaUrl) - LOG.info("Starting JobManger web frontend") - // start the web frontend. we need to load this dynamically - // because it is not in the same project/dependencies - val webServer = WebMonitorUtils.startWebRuntimeMonitor( - configuration, - leaderRetrievalService, - jobManagerSystem) } - (jobManagerSystem, jobManager, archive, webMonitor) } catch { http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 4c6ddfd..dc6f550 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.junit.AfterClass; import org.junit.Before; @@ -56,8 +58,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint ClassLoader userLoader) throws Exception { return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader, - ZooKeeper.createClient(), CheckpointsPath, new LocalStateHandle - .LocalStateHandleProvider<CompletedCheckpoint>()); + ZooKeeper.createClient(), CheckpointsPath, new StateStorageHelper<CompletedCheckpoint>() { + @Override + public StateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception { + return new LocalStateHandle<>(state); + } + }); } // --------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 4df8afb..ea4195c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.execution.librarycache; -import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -28,9 +27,9 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobmanager.RecoveryMode; -import org.junit.After; -import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.FileInputStream; @@ -46,23 +45,8 @@ import static org.junit.Assert.assertEquals; public class BlobLibraryCacheRecoveryITCase { - private File recoveryDir; - - @Before - public void setUp() throws Exception { - recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir"); - if (!recoveryDir.exists() && !recoveryDir.mkdirs()) { - throw new IllegalStateException("Failed to create temp directory for test"); - } - } - - @After - public void cleanUp() throws Exception { - if (recoveryDir != null) { - FileUtils.deleteDirectory(recoveryDir); - } - } - + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); /** * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any * participating BlobLibraryCacheManager. @@ -81,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase { Configuration config = new Configuration(); config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); - config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath()); + config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, temporaryFolder.getRoot().getAbsolutePath()); for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config); @@ -170,7 +154,7 @@ public class BlobLibraryCacheRecoveryITCase { } // Verify everything is clean - File[] recoveryFiles = recoveryDir.listFiles(); + File[] recoveryFiles = temporaryFolder.getRoot().listFiles(); assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java index 861a713..356ba36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java @@ -24,7 +24,9 @@ import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener; -import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -54,8 +56,13 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); - private final static LocalStateHandleProvider<SubmittedJobGraph> StateHandleProvider = - new LocalStateHandleProvider<>(); + private final static StateStorageHelper<SubmittedJobGraph> localStateStorage = new StateStorageHelper<SubmittedJobGraph>() { + @Override + public StateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws Exception { + return new LocalStateHandle<>(state); + } + }; + @AfterClass public static void tearDown() throws Exception { @@ -72,8 +79,9 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { @Test public void testPutAndRemoveJobGraph() throws Exception { ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore( - ZooKeeper.createClient(), "/testPutAndRemoveJobGraph", - StateHandleProvider); + ZooKeeper.createClient(), + "/testPutAndRemoveJobGraph", + localStateStorage); try { SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class); @@ -125,7 +133,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { @Test public void testRecoverJobGraphs() throws Exception { ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore( - ZooKeeper.createClient(), "/testRecoverJobGraphs", StateHandleProvider); + ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage); try { SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class); @@ -175,10 +183,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { try { jobGraphs = new ZooKeeperSubmittedJobGraphStore( - ZooKeeper.createClient(), "/testConcurrentAddJobGraph", StateHandleProvider); + ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage); otherJobGraphs = new ZooKeeperSubmittedJobGraphStore( - ZooKeeper.createClient(), "/testConcurrentAddJobGraph", StateHandleProvider); + ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage); SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0); @@ -234,10 +242,10 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { @Test(expected = IllegalStateException.class) public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception { ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore( - ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider); + ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage); ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore( - ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", StateHandleProvider); + ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage); jobGraphs.start(null); otherJobGraphs.start(null);
