Repository: flink Updated Branches: refs/heads/master cd899f3be -> d1ea365ef
[FLINK-3084] [streaming] FsStateBackend backs up very small state directly with the metadata. This closes #1423 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1ea365e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1ea365e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1ea365e Branch: refs/heads/master Commit: d1ea365ef1fa979d40532bf3f114fc03284164bc Parents: cd899f3 Author: Stephan Ewen <se...@apache.org> Authored: Thu Nov 26 18:46:49 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Dec 1 09:44:17 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/state/StreamStateHandle.java | 14 +- .../state/filesystem/AbstractFileState.java | 4 +- .../filesystem/FileSerializableStateHandle.java | 3 +- .../state/filesystem/FileStreamStateHandle.java | 7 + .../state/filesystem/FsStateBackend.java | 250 ++++++++++++++----- .../state/filesystem/FsStateBackendFactory.java | 10 +- .../state/memory/ByteStreamStateHandle.java | 8 + .../state/memory/SerializedStateHandle.java | 39 ++- .../runtime/state/FileStateBackendTest.java | 45 ++-- .../FsCheckpointStateOutputStreamTest.java | 128 ++++++++++ .../flink/hdfstests/FileStateBackendTest.java | 32 +-- .../src/test/resources/log4j-test.properties | 31 +++ 12 files changed, 461 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java index 32c601e..891243b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java @@ -18,11 +18,19 @@ package org.apache.flink.runtime.state; -import org.apache.flink.runtime.state.StateHandle; - import java.io.InputStream; +import java.io.Serializable; /** * A state handle that produces an input stream when resolved. */ -public interface StreamStateHandle extends StateHandle<InputStream> {} +public interface StreamStateHandle extends StateHandle<InputStream> { + + /** + * Converts this stream state handle into a state handle that de-serializes + * the stream into an object using Java's serialization mechanism. + * + * @return The state handle that automatically de-serializes. + */ + <T extends Serializable> StateHandle<T> toSerializableHandle(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java index d64e2c4..e0a42b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java @@ -23,6 +23,8 @@ import org.apache.flink.core.fs.Path; import java.io.IOException; +import static java.util.Objects.requireNonNull; + /** * Base class for state that is stored in a file. */ @@ -42,7 +44,7 @@ public abstract class AbstractFileState implements java.io.Serializable { * @param filePath The path to the file that stores the state. */ protected AbstractFileState(Path filePath) { - this.filePath = filePath; + this.filePath = requireNonNull(filePath); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java index 63336d1..edbbe69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java @@ -24,13 +24,14 @@ import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.util.InstantiationUtil; import java.io.ObjectInputStream; +import java.io.Serializable; /** * A state handle that points to state stored in a file via Java Serialization. * * @param <T> The type of state pointed to by the state handle. */ -public class FileSerializableStateHandle<T> extends AbstractFileState implements StateHandle<T> { +public class FileSerializableStateHandle<T extends Serializable> extends AbstractFileState implements StateHandle<T> { private static final long serialVersionUID = -657631394290213622L; http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java index f4681ea..b2f2ecc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java @@ -19,9 +19,11 @@ package org.apache.flink.runtime.state.filesystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import java.io.InputStream; +import java.io.Serializable; /** * A state handle that points to state in a file system, accessible as an input stream. @@ -43,4 +45,9 @@ public class FileStreamStateHandle extends AbstractFileState implements StreamSt public InputStream getState(ClassLoader userCodeClassLoader) throws Exception { return getFileSystem().open(getFilePath()); } + + @Override + public <T extends Serializable> StateHandle<T> toSerializableHandle() { + return new FileSerializableStateHandle<>(getFilePath()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 25c63e5..a6eebf6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -25,6 +25,9 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +36,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; +import java.util.Arrays; import java.util.UUID; /** @@ -50,11 +54,24 @@ public class FsStateBackend extends StateBackend<FsStateBackend> { private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class); + /** By default, state smaller than 1024 bytes will not be written to files, but + * will be stored directly with the metadata */ + public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024; + + /** Maximum size of state that is stored with the metadata, rather than in files */ + public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; + + /** Default size for the write buffer */ + private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096; + /** The path to the directory for the checkpoint data, including the file system * description via scheme and optional authority */ private final Path basePath; + /** State below this size will be stored as part of the metadata, rather than in files */ + private final int fileStateThreshold; + /** The directory (job specific) into this initialized instance of the backend stores its data */ private transient Path checkpointDirectory; @@ -112,10 +129,32 @@ public class FsStateBackend extends StateBackend<FsStateBackend> { * classpath. * * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to teh checkpoint data directory. + * and the path to the checkpoint data directory. * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public FsStateBackend(URI checkpointDataUri) throws IOException { + this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD); + } + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. + * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata, + * rather than in files + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException { final String scheme = checkpointDataUri.getScheme(); final String path = checkpointDataUri.getPath(); @@ -131,6 +170,13 @@ public class FsStateBackend extends StateBackend<FsStateBackend> { if (path.length() == 0 || path.equals("/")) { throw new IllegalArgumentException("Cannot use the root directory for checkpoints."); } + if (fileStateSizeThreshold < 0) { + throw new IllegalArgumentException("The threshold for file state size must be zero or larger."); + } + if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) { + throw new IllegalArgumentException("The threshold for file state size cannot be larger than " + + MAX_FILE_STATE_THRESHOLD); + } // we do a bit of work to make sure that the URI for the filesystem refers to exactly the same // (distributed) filesystem on all hosts and includes full host/port information, even if the @@ -153,6 +199,8 @@ public class FsStateBackend extends StateBackend<FsStateBackend> { String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s", checkpointDataUri, fsURI), e); } + + this.fileStateThreshold = fileStateSizeThreshold; } /** @@ -176,6 +224,18 @@ public class FsStateBackend extends StateBackend<FsStateBackend> { } /** + * Gets the size (in bytes) above which the state will written to files. State whose size + * is below this threshold will be directly stored with the metadata + * (the state handles), rather than in files. This threshold helps to prevent an accumulation + * of small files for small states. + * + * @return The threshold (in bytes) above which state is written to files. + */ + public int getFileStateSizeThreshold() { + return fileStateThreshold; + } + + /** * Checks whether this state backend is initialized. Note that initialization does not carry * across serialization. After each serialization, the state backend needs to be initialized. * @@ -248,54 +308,26 @@ public class FsStateBackend extends StateBackend<FsStateBackend> { S state, long checkpointID, long timestamp) throws Exception { checkFileSystemInitialized(); - - // make sure the directory for that specific checkpoint exists - final Path checkpointDir = createCheckpointDirPath(checkpointID); - filesystem.mkdirs(checkpointDir); - - - Exception latestException = null; - - for (int attempt = 0; attempt < 10; attempt++) { - Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString()); - FSDataOutputStream outStream; - try { - outStream = filesystem.create(targetPath, false); - } - catch (Exception e) { - latestException = e; - continue; - } - - try (ObjectOutputStream os = new ObjectOutputStream(outStream)) { - os.writeObject(state); - } - return new FileSerializableStateHandle<S>(targetPath); + + Path checkpointDir = createCheckpointDirPath(checkpointID); + int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold); + + FsCheckpointStateOutputStream stream = + new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold); + + try (ObjectOutputStream os = new ObjectOutputStream(stream)) { + os.writeObject(state); + return stream.closeAndGetHandle().toSerializableHandle(); } - - throw new Exception("Could not open output stream for state backend", latestException); } @Override public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { checkFileSystemInitialized(); - final Path checkpointDir = createCheckpointDirPath(checkpointID); - filesystem.mkdirs(checkpointDir); - - Exception latestException = null; - - for (int attempt = 0; attempt < 10; attempt++) { - Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString()); - try { - FSDataOutputStream outStream = filesystem.create(targetPath, false); - return new FsCheckpointStateOutputStream(outStream, targetPath, filesystem); - } - catch (Exception e) { - latestException = e; - } - } - throw new Exception("Could not open output stream for state backend", latestException); + Path checkpointDir = createCheckpointDirPath(checkpointID); + int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold); + return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold); } // ------------------------------------------------------------------------ @@ -329,34 +361,104 @@ public class FsStateBackend extends StateBackend<FsStateBackend> { */ public static final class FsCheckpointStateOutputStream extends CheckpointStateOutputStream { - private final FSDataOutputStream outStream; + private final byte[] writeBuffer; - private final Path filePath; + private int pos; - private final FileSystem fs; + private FSDataOutputStream outStream; + + private final int localStateThreshold; + private final Path basePath; + + private final FileSystem fs; + + private Path statePath; + private boolean closed; - FsCheckpointStateOutputStream(FSDataOutputStream outStream, Path filePath, FileSystem fs) { - this.outStream = outStream; - this.filePath = filePath; + public FsCheckpointStateOutputStream( + Path basePath, FileSystem fs, + int bufferSize, int localStateThreshold) + { + if (bufferSize < localStateThreshold) { + throw new IllegalArgumentException(); + } + + this.basePath = basePath; this.fs = fs; + this.writeBuffer = new byte[bufferSize]; + this.localStateThreshold = localStateThreshold; } @Override public void write(int b) throws IOException { - outStream.write(b); + if (pos >= writeBuffer.length) { + flush(); + } + writeBuffer[pos++] = (byte) b; } @Override public void write(byte[] b, int off, int len) throws IOException { - outStream.write(b, off, len); + if (len < writeBuffer.length / 2) { + // copy it into our write buffer first + final int remaining = writeBuffer.length - pos; + if (len > remaining) { + // copy as much as fits + System.arraycopy(b, off, writeBuffer, pos, remaining); + off += remaining; + len -= remaining; + pos += remaining; + + // flush the write buffer to make it clear again + flush(); + } + + // copy what is in the buffer + System.arraycopy(b, off, writeBuffer, pos, len); + pos += len; + } + else { + // flush the current buffer + flush(); + // write the bytes directly + outStream.write(b, off, len); + } } @Override public void flush() throws IOException { - outStream.flush(); + if (!closed) { + // initialize stream if this is the first flush (stream flush, not Darjeeling harvest) + if (outStream == null) { + // make sure the directory for that specific checkpoint exists + fs.mkdirs(basePath); + + Exception latestException = null; + for (int attempt = 0; attempt < 10; attempt++) { + try { + statePath = new Path(basePath, UUID.randomUUID().toString()); + outStream = fs.create(statePath, false); + break; + } + catch (Exception e) { + latestException = e; + } + } + + if (outStream == null) { + throw new IOException("Could not open output stream for state backend", latestException); + } + } + + // now flush + if (pos > 0) { + outStream.write(writeBuffer, 0, pos); + pos = 0; + } + } } /** @@ -369,25 +471,44 @@ public class FsStateBackend extends StateBackend<FsStateBackend> { synchronized (this) { if (!closed) { closed = true; - try { - outStream.close(); - fs.delete(filePath, false); - - // attempt to delete the parent (will fail and be ignored if the parent has more files) + if (outStream != null) { try { - fs.delete(filePath.getParent(), false); - } catch (IOException ignored) {} - } - catch (Exception e) { - LOG.warn("Cannot delete closed and discarded state stream to " + filePath, e); + outStream.close(); + fs.delete(statePath, false); + + // attempt to delete the parent (will fail and be ignored if the parent has more files) + try { + fs.delete(basePath, false); + } catch (IOException ignored) {} + } + catch (Exception e) { + LOG.warn("Cannot delete closed and discarded state stream for " + statePath, e); + } } } } } @Override - public FileStreamStateHandle closeAndGetHandle() throws IOException { - return new FileStreamStateHandle(closeAndGetPath()); + public StreamStateHandle closeAndGetHandle() throws IOException { + synchronized (this) { + if (!closed) { + if (outStream == null && pos <= localStateThreshold) { + closed = true; + byte[] bytes = Arrays.copyOf(writeBuffer, pos); + return new ByteStreamStateHandle(bytes); + } + else { + flush(); + outStream.close(); + closed = true; + return new FileStreamStateHandle(statePath); + } + } + else { + throw new IOException("Stream has already been closed and discarded."); + } + } } /** @@ -399,8 +520,9 @@ public class FsStateBackend extends StateBackend<FsStateBackend> { synchronized (this) { if (!closed) { closed = true; + flush(); outStream.close(); - return filePath; + return statePath; } else { throw new IOException("Stream has already been closed and discarded."); http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java index e687f7f..042700c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java @@ -31,12 +31,18 @@ public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend /** The key under which the config stores the directory where checkpoints should be stored */ public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir"; + + /** The key under which the config stores the threshold for state to be store in memory, + * rather than in files */ + public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold"; @Override public FsStateBackend createFromConfig(Configuration config) throws Exception { String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); - + int memoryThreshold = config.getInteger( + MEMORY_THRESHOLD_CONF_KEY, FsStateBackend.DEFAULT_FILE_STATE_THRESHOLD); + if (checkpointDirURI == null) { throw new IllegalConfigurationException( "Cannot create the file system state backend: The configuration does not specify the " + @@ -45,7 +51,7 @@ public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend try { Path path = new Path(checkpointDirURI); - return new FsStateBackend(path); + return new FsStateBackend(path.toUri(), memoryThreshold); } catch (IllegalArgumentException e) { throw new Exception("Cannot initialize File System State Backend with URI '" http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java index 29762f7..def8a36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java @@ -18,10 +18,12 @@ package org.apache.flink.runtime.state.memory; +import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.io.Serializable; /** * A state handle that contains stream state in a byte array. @@ -49,4 +51,10 @@ public final class ByteStreamStateHandle implements StreamStateHandle { @Override public void discardState() {} + + + @Override + public <T extends Serializable> StateHandle<T> toSerializableHandle() { + return new SerializedStateHandle<T>(data); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java index c488dc9..54b7175f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java @@ -19,26 +19,57 @@ package org.apache.flink.runtime.state.memory; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.InstantiationUtil; import java.io.IOException; +import java.io.Serializable; /** * A state handle that represents its state in serialized form as bytes. * * @param <T> The type of state represented by this state handle. */ -public class SerializedStateHandle<T> extends SerializedValue<T> implements StateHandle<T> { +public class SerializedStateHandle<T extends Serializable> implements StateHandle<T> { private static final long serialVersionUID = 4145685722538475769L; + /** The serialized data */ + private final byte[] serializedData; + + /** + * Creates a new serialized state handle, eagerly serializing the given state object. + * + * @param value The state object. + * @throws IOException Thrown, if the serialization fails. + */ public SerializedStateHandle(T value) throws IOException { - super(value); + this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value); + } + + /** + * Creates a new serialized state handle, based in the given already serialized data. + * + * @param serializedData The serialized data. + */ + public SerializedStateHandle(byte[] serializedData) { + this.serializedData = serializedData; } @Override public T getState(ClassLoader classLoader) throws Exception { - return deserializeValue(classLoader); + if (classLoader == null) { + throw new NullPointerException(); + } + + return serializedData == null ? null : InstantiationUtil.<T>deserializeObject(serializedData, classLoader); + } + + /** + * Gets the size of the serialized state. + * @return The size of the serialized state. + */ + public int getSizeOfSerializedState() { + return serializedData.length; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java index 37ccde2..d1298b1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java @@ -34,6 +34,7 @@ import java.util.Random; import java.util.UUID; import org.apache.commons.io.FileUtils; + import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.FloatSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; @@ -46,9 +47,9 @@ import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.types.IntValue; import org.apache.flink.types.StringValue; -import org.apache.flink.util.OperatingSystem; import org.junit.Test; @@ -104,7 +105,8 @@ public class FileStateBackendTest { public void testSerializableState() { File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); try { - FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); + FsStateBackend backend = CommonTestUtils.createCopySerializable( + new FsStateBackend(tempDir.toURI(), 40)); backend.initializeForJob(new DummyEnvironment("test", 0, 0)); File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); @@ -116,16 +118,13 @@ public class FileStateBackendTest { StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis()); StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis()); StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis()); - - assertFalse(isDirectoryEmpty(checkpointDir)); + assertEquals(state1, handle1.getState(getClass().getClassLoader())); handle1.discardState(); - - assertFalse(isDirectoryEmpty(checkpointDir)); + assertEquals(state2, handle2.getState(getClass().getClassLoader())); handle2.discardState(); - - assertFalse(isDirectoryEmpty(checkpointDir)); + assertEquals(state3, handle3.getState(getClass().getClassLoader())); handle3.discardState(); @@ -144,7 +143,10 @@ public class FileStateBackendTest { public void testStateOutputStream() { File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); try { - FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); + // the state backend has a very low in-mem state threshold (15 bytes) + FsStateBackend backend = CommonTestUtils.createCopySerializable( + new FsStateBackend(tempDir.toURI(), 15)); + backend.initializeForJob(new DummyEnvironment("test", 0, 0)); File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); @@ -173,16 +175,16 @@ public class FileStateBackendTest { stream2.write(state2); stream3.write(state3); - FileStreamStateHandle handle1 = stream1.closeAndGetHandle(); - FileStreamStateHandle handle2 = stream2.closeAndGetHandle(); - FileStreamStateHandle handle3 = stream3.closeAndGetHandle(); + FileStreamStateHandle handle1 = (FileStreamStateHandle) stream1.closeAndGetHandle(); + ByteStreamStateHandle handle2 = (ByteStreamStateHandle) stream2.closeAndGetHandle(); + ByteStreamStateHandle handle3 = (ByteStreamStateHandle) stream3.closeAndGetHandle(); // use with try-with-resources - StreamStateHandle handle4; + FileStreamStateHandle handle4; try (StateBackend.CheckpointStateOutputStream stream4 = backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) { stream4.write(state4); - handle4 = stream4.closeAndGetHandle(); + handle4 = (FileStreamStateHandle) stream4.closeAndGetHandle(); } // close before accessing handle @@ -204,13 +206,9 @@ public class FileStateBackendTest { validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2); handle2.discardState(); - assertFalse(isDirectoryEmpty(checkpointDir)); - ensureLocalFileDeleted(handle2.getFilePath()); validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3); handle3.discardState(); - assertFalse(isDirectoryEmpty(checkpointDir)); - ensureLocalFileDeleted(handle3.getFilePath()); validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4); handle4.discardState(); @@ -440,7 +438,7 @@ public class FileStateBackendTest { private static boolean isDirectoryEmpty(File directory) { String[] nested = directory.list(); - return nested == null || nested.length == 0; + return nested == null || nested.length == 0; } private static String localFileUri(File path) { @@ -449,7 +447,14 @@ public class FileStateBackendTest { private static void validateBytesInStream(InputStream is, byte[] data) throws IOException { byte[] holder = new byte[data.length]; - assertEquals("not enough data", holder.length, is.read(holder)); + int numBytesRead = is.read(holder); + + if (holder.length == 0) { + assertTrue("stream not empty", numBytesRead == 0 || numBytesRead == -1); + } else { + assertEquals("not enough data", holder.length, numBytesRead); + } + assertEquals("too much data", -1, is.read()); assertArrayEquals("wrong data", data, holder); } http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java new file mode 100644 index 0000000..66a7271 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +import org.junit.Test; + +import java.io.File; +import java.io.InputStream; +import java.util.Random; + +import static org.junit.Assert.*; + +public class FsCheckpointStateOutputStreamTest { + + /** The temp dir, obtained in a platform neutral way */ + private static final Path TEMP_DIR_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI()); + + + @Test(expected = IllegalArgumentException.class) + public void testWrongParameters() { + // this should fail + new FsStateBackend.FsCheckpointStateOutputStream( + TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 5000); + } + + + @Test + public void testEmptyState() throws Exception { + StateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream( + TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512); + + StreamStateHandle handle = stream.closeAndGetHandle(); + assertTrue(handle instanceof ByteStreamStateHandle); + + InputStream inStream = handle.getState(ClassLoader.getSystemClassLoader()); + assertEquals(-1, inStream.read()); + } + + @Test + public void testStateBlowMemThreshold() throws Exception { + runTest(222, 999, 512, false); + } + + @Test + public void testStateOneBufferAboveThreshold() throws Exception { + runTest(896, 1024, 15, true); + } + + @Test + public void testStateAboveMemThreshold() throws Exception { + runTest(576446, 259, 17, true); + } + + @Test + public void testZeroThreshold() throws Exception { + runTest(16678, 4096, 0, true); + } + + private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception { + StateBackend.CheckpointStateOutputStream stream = + new FsStateBackend.FsCheckpointStateOutputStream( + TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), bufferSize, threshold); + + Random rnd = new Random(); + byte[] original = new byte[numBytes]; + byte[] bytes = new byte[original.length]; + + rnd.nextBytes(original); + System.arraycopy(original, 0, bytes, 0, original.length); + + // the test writes a mixture of writing individual bytes and byte arrays + int pos = 0; + while (pos < bytes.length) { + boolean single = rnd.nextBoolean(); + if (single) { + stream.write(bytes[pos++]); + } + else { + int num = rnd.nextInt(Math.min(10, bytes.length - pos)); + stream.write(bytes, pos, num); + pos += num; + } + } + + StreamStateHandle handle = stream.closeAndGetHandle(); + if (expectFile) { + assertTrue(handle instanceof FileStreamStateHandle); + } else { + assertTrue(handle instanceof ByteStreamStateHandle); + } + + // make sure the writing process did not alter the original byte array + assertArrayEquals(original, bytes); + + InputStream inStream = handle.getState(ClassLoader.getSystemClassLoader()); + byte[] validation = new byte[bytes.length]; + int bytesRead = inStream.read(validation); + + assertEquals(numBytes, bytesRead); + assertEquals(-1, inStream.read()); + + assertArrayEquals(bytes, validation); + + handle.discardState(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java index 4fb6820..ce8298b 100644 --- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java +++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -42,6 +43,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.net.URISyntaxException; import java.util.Random; import java.util.UUID; @@ -146,9 +148,9 @@ public class FileStateBackendTest { @Test public void testSerializableState() { - try { - FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri())); + FsStateBackend backend = CommonTestUtils.createCopySerializable( + new FsStateBackend(randomHdfsFileUri(), 40)); backend.initializeForJob(new DummyEnvironment("test", 0, 0)); Path checkpointDir = backend.getCheckpointDirectory(); @@ -161,15 +163,12 @@ public class FileStateBackendTest { StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis()); StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis()); - assertFalse(isDirectoryEmpty(checkpointDir)); assertEquals(state1, handle1.getState(getClass().getClassLoader())); handle1.discardState(); - assertFalse(isDirectoryEmpty(checkpointDir)); assertEquals(state2, handle2.getState(getClass().getClassLoader())); handle2.discardState(); - assertFalse(isDirectoryEmpty(checkpointDir)); assertEquals(state3, handle3.getState(getClass().getClassLoader())); handle3.discardState(); @@ -184,7 +183,8 @@ public class FileStateBackendTest { @Test public void testStateOutputStream() { try { - FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri())); + FsStateBackend backend = CommonTestUtils.createCopySerializable( + new FsStateBackend(randomHdfsFileUri(), 15)); backend.initializeForJob(new DummyEnvironment("test", 0, 0)); Path checkpointDir = backend.getCheckpointDirectory(); @@ -213,9 +213,9 @@ public class FileStateBackendTest { stream2.write(state2); stream3.write(state3); - FileStreamStateHandle handle1 = stream1.closeAndGetHandle(); - FileStreamStateHandle handle2 = stream2.closeAndGetHandle(); - FileStreamStateHandle handle3 = stream3.closeAndGetHandle(); + FileStreamStateHandle handle1 = (FileStreamStateHandle) stream1.closeAndGetHandle(); + ByteStreamStateHandle handle2 = (ByteStreamStateHandle) stream2.closeAndGetHandle(); + ByteStreamStateHandle handle3 = (ByteStreamStateHandle) stream3.closeAndGetHandle(); // use with try-with-resources StreamStateHandle handle4; @@ -244,13 +244,9 @@ public class FileStateBackendTest { validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2); handle2.discardState(); - assertFalse(isDirectoryEmpty(checkpointDir)); - ensureFileDeleted(handle2.getFilePath()); validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3); handle3.discardState(); - assertFalse(isDirectoryEmpty(checkpointDir)); - ensureFileDeleted(handle3.getFilePath()); validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4); handle4.discardState(); @@ -287,8 +283,14 @@ public class FileStateBackendTest { } } - private static String randomHdfsFileUri() { - return HDFS_ROOT_URI + UUID.randomUUID().toString(); + private static URI randomHdfsFileUri() { + String uriString = HDFS_ROOT_URI + UUID.randomUUID().toString(); + try { + return new URI(uriString); + } + catch (URISyntaxException e) { + throw new RuntimeException("Invalid test directory URI: " + uriString, e); + } } private static void validateBytesInStream(InputStream is, byte[] data) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties b/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..f533ba2 --- /dev/null +++ b/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties @@ -0,0 +1,31 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Tachyon's test-jar dependency adds a log4j.properties file to classpath. +# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571) +# we provide a log4j.properties file ourselves. + +log4j.rootLogger=OFF, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file