http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java index f8084ca..20f66c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java @@ -23,16 +23,20 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; +import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -45,43 +49,59 @@ public class TestTaskStateManager implements TaskStateManager { private JobID jobId; private ExecutionAttemptID executionAttemptID; - private final Map<Long, TaskStateSnapshot> taskStateSnapshotsByCheckpointId; + private final Map<Long, TaskStateSnapshot> jobManagerTaskStateSnapshotsByCheckpointId; + private final Map<Long, TaskStateSnapshot> taskManagerTaskStateSnapshotsByCheckpointId; private CheckpointResponder checkpointResponder; private OneShotLatch waitForReportLatch; + private LocalRecoveryConfig localRecoveryDirectoryProvider; public TestTaskStateManager() { - this(new JobID(), new ExecutionAttemptID(), new TestCheckpointResponder()); + this(TestLocalRecoveryConfig.disabled()); + } + + public TestTaskStateManager(LocalRecoveryConfig localRecoveryConfig) { + this( + new JobID(), + new ExecutionAttemptID(), + new TestCheckpointResponder(), + localRecoveryConfig); } public TestTaskStateManager( JobID jobId, ExecutionAttemptID executionAttemptID) { - - this(jobId, executionAttemptID, null); + this(jobId, executionAttemptID, null, TestLocalRecoveryConfig.disabled()); } public TestTaskStateManager( JobID jobId, ExecutionAttemptID executionAttemptID, - CheckpointResponder checkpointResponder) { + CheckpointResponder checkpointResponder, + LocalRecoveryConfig localRecoveryConfig) { this.jobId = jobId; this.executionAttemptID = executionAttemptID; this.checkpointResponder = checkpointResponder; - this.taskStateSnapshotsByCheckpointId = new HashMap<>(); + this.localRecoveryDirectoryProvider = localRecoveryConfig; + this.jobManagerTaskStateSnapshotsByCheckpointId = new HashMap<>(); + this.taskManagerTaskStateSnapshotsByCheckpointId = new HashMap<>(); this.reportedCheckpointId = -1L; } @Override - public void reportStateHandles( + public void reportTaskStateSnapshots( @Nonnull CheckpointMetaData checkpointMetaData, @Nonnull CheckpointMetrics checkpointMetrics, - @Nullable TaskStateSnapshot acknowledgedState) { + @Nullable TaskStateSnapshot acknowledgedState, + @Nullable TaskStateSnapshot localState) { - if (taskStateSnapshotsByCheckpointId != null) { - taskStateSnapshotsByCheckpointId.put( - checkpointMetaData.getCheckpointId(), - acknowledgedState); - } + + jobManagerTaskStateSnapshotsByCheckpointId.put( + checkpointMetaData.getCheckpointId(), + acknowledgedState); + + taskManagerTaskStateSnapshotsByCheckpointId.put( + checkpointMetaData.getCheckpointId(), + localState); if (checkpointResponder != null) { checkpointResponder.acknowledgeCheckpoint( @@ -99,10 +119,48 @@ public class TestTaskStateManager implements TaskStateManager { } } + @Nonnull @Override - public OperatorSubtaskState operatorStates(OperatorID operatorID) { - TaskStateSnapshot taskStateSnapshot = getLastTaskStateSnapshot(); - return taskStateSnapshot != null ? taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null; + public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) { + TaskStateSnapshot jmTaskStateSnapshot = getLastJobManagerTaskStateSnapshot(); + TaskStateSnapshot tmTaskStateSnapshot = getLastTaskManagerTaskStateSnapshot(); + + if (jmTaskStateSnapshot == null) { + + return PrioritizedOperatorSubtaskState.emptyNotRestored(); + } else { + + OperatorSubtaskState jmOpState = jmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID); + + if (jmOpState == null) { + + return PrioritizedOperatorSubtaskState.emptyNotRestored(); + } else { + + List<OperatorSubtaskState> tmStateCollection = Collections.emptyList(); + + if (tmTaskStateSnapshot != null) { + OperatorSubtaskState tmOpState = tmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID); + if (tmOpState != null) { + tmStateCollection = Collections.singletonList(tmOpState); + } + } + PrioritizedOperatorSubtaskState.Builder builder = + new PrioritizedOperatorSubtaskState.Builder(jmOpState, tmStateCollection); + return builder.build(); + } + } + } + + @Nonnull + @Override + public LocalRecoveryConfig createLocalRecoveryConfig() { + return Preconditions.checkNotNull(localRecoveryDirectoryProvider, + "Local state directory was never set for this test object!"); + } + + public void setLocalRecoveryConfig(LocalRecoveryConfig recoveryDirectoryProvider) { + this.localRecoveryDirectoryProvider = recoveryDirectoryProvider; } @Override @@ -134,13 +192,24 @@ public class TestTaskStateManager implements TaskStateManager { this.checkpointResponder = checkpointResponder; } - public Map<Long, TaskStateSnapshot> getTaskStateSnapshotsByCheckpointId() { - return taskStateSnapshotsByCheckpointId; + public Map<Long, TaskStateSnapshot> getJobManagerTaskStateSnapshotsByCheckpointId() { + return jobManagerTaskStateSnapshotsByCheckpointId; } - public void setTaskStateSnapshotsByCheckpointId(Map<Long, TaskStateSnapshot> taskStateSnapshotsByCheckpointId) { - this.taskStateSnapshotsByCheckpointId.clear(); - this.taskStateSnapshotsByCheckpointId.putAll(taskStateSnapshotsByCheckpointId); + public void setJobManagerTaskStateSnapshotsByCheckpointId( + Map<Long, TaskStateSnapshot> jobManagerTaskStateSnapshotsByCheckpointId) { + this.jobManagerTaskStateSnapshotsByCheckpointId.clear(); + this.jobManagerTaskStateSnapshotsByCheckpointId.putAll(jobManagerTaskStateSnapshotsByCheckpointId); + } + + public Map<Long, TaskStateSnapshot> getTaskManagerTaskStateSnapshotsByCheckpointId() { + return taskManagerTaskStateSnapshotsByCheckpointId; + } + + public void setTaskManagerTaskStateSnapshotsByCheckpointId( + Map<Long, TaskStateSnapshot> taskManagerTaskStateSnapshotsByCheckpointId) { + this.taskManagerTaskStateSnapshotsByCheckpointId.clear(); + this.taskManagerTaskStateSnapshotsByCheckpointId.putAll(taskManagerTaskStateSnapshotsByCheckpointId); } public long getReportedCheckpointId() { @@ -151,9 +220,15 @@ public class TestTaskStateManager implements TaskStateManager { this.reportedCheckpointId = reportedCheckpointId; } - public TaskStateSnapshot getLastTaskStateSnapshot() { - return taskStateSnapshotsByCheckpointId != null ? - taskStateSnapshotsByCheckpointId.get(reportedCheckpointId) + public TaskStateSnapshot getLastJobManagerTaskStateSnapshot() { + return jobManagerTaskStateSnapshotsByCheckpointId != null ? + jobManagerTaskStateSnapshotsByCheckpointId.get(reportedCheckpointId) + : null; + } + + public TaskStateSnapshot getLastTaskManagerTaskStateSnapshot() { + return taskManagerTaskStateSnapshotsByCheckpointId != null ? + taskManagerTaskStateSnapshotsByCheckpointId.get(reportedCheckpointId) : null; } @@ -181,6 +256,6 @@ public class TestTaskStateManager implements TaskStateManager { } setReportedCheckpointId(latestId); - setTaskStateSnapshotsByCheckpointId(taskStateSnapshotsByCheckpointId); + setJobManagerTaskStateSnapshotsByCheckpointId(taskStateSnapshotsByCheckpointId); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractCheckpointStateOutputStreamTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractCheckpointStateOutputStreamTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractCheckpointStateOutputStreamTestBase.java new file mode 100644 index 0000000..6b526f4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractCheckpointStateOutputStreamTestBase.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalDataOutputStream; +import org.apache.flink.core.fs.local.LocalFileSystem; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Random; +import java.util.UUID; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +/** + * Abstract base class for tests against checkpointing streams. + */ +public abstract class AbstractCheckpointStateOutputStreamTestBase extends TestLogger { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + // ------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------ + + /** + * Validates that even empty streams create a file and a file state handle. + */ + @Test + public void testEmptyState() throws Exception { + final FileSystem fs = FileSystem.getLocalFileSystem(); + final Path folder = baseFolder(); + final String fileName = "myFileName"; + final Path filePath = new Path(folder, fileName); + + final FileStateHandle handle; + try (FSDataOutputStream stream = createTestStream(fs, folder, fileName)) { + handle = closeAndGetResult(stream); + } + + // must have created a handle + assertNotNull(handle); + assertEquals(filePath, handle.getFilePath()); + + // the pointer path should exist as a directory + assertTrue(fs.exists(handle.getFilePath())); + assertFalse(fs.getFileStatus(filePath).isDir()); + + // the contents should be empty + try (FSDataInputStream in = handle.openInputStream()) { + assertEquals(-1, in.read()); + } + } + + /** + * Simple write and read test + */ + @Test + public void testWriteAndRead() throws Exception { + final FileSystem fs = FileSystem.getLocalFileSystem(); + final Path folder = baseFolder(); + final String fileName = "fooBarName"; + + final Random rnd = new Random(); + final byte[] data = new byte[1694523]; + + // write the data (mixed single byte writes and array writes) + final FileStateHandle handle; + try (FSDataOutputStream stream = createTestStream(fs, folder, fileName)) { + for (int i = 0; i < data.length; ) { + if (rnd.nextBoolean()) { + stream.write(data[i++]); + } else { + int len = rnd.nextInt(Math.min(data.length - i, 32)); + stream.write(data, i, len); + i += len; + } + } + handle = closeAndGetResult(stream); + } + + // (1) stream from handle must hold the contents + try (FSDataInputStream in = handle.openInputStream()) { + byte[] buffer = new byte[data.length]; + readFully(in, buffer); + assertArrayEquals(data, buffer); + } + + // (2) the pointer must point to a file with that contents + try (FSDataInputStream in = fs.open(handle.getFilePath())) { + byte[] buffer = new byte[data.length]; + readFully(in, buffer); + assertArrayEquals(data, buffer); + } + } + + /** + * Tests that the underlying stream file is deleted upon calling close. + */ + @Test + public void testCleanupWhenClosingStream() throws IOException { + final FileSystem fs = FileSystem.getLocalFileSystem(); + final Path folder = new Path(tmp.newFolder().toURI()); + final String fileName = "nonCreativeTestFileName"; + final Path path = new Path(folder, fileName); + + // write some test data and close the stream + try (FSDataOutputStream stream = createTestStream(fs, folder, fileName)) { + Random rnd = new Random(); + for (int i = 0; i < rnd.nextInt(1000); i++) { + stream.write(rnd.nextInt(100)); + } + assertTrue(fs.exists(path)); + } + + assertFalse(fs.exists(path)); + } + + /** + * Tests that the underlying stream file is deleted if the closeAndGetHandle method fails. + */ + @Test + public void testCleanupWhenFailingCloseAndGetHandle() throws IOException { + final Path folder = new Path(tmp.newFolder().toURI()); + final String fileName = "test_name"; + final Path filePath = new Path(folder, fileName); + + final FileSystem fs = spy(new TestFs((path) -> new FailingCloseStream(new File(path.getPath())))); + + FSDataOutputStream stream = createTestStream(fs, folder, fileName); + stream.write(new byte[] {1, 2, 3, 4, 5}); + + try { + closeAndGetResult(stream); + fail("Expected IOException"); + } + catch (IOException ignored) { + // expected exception + } + + verify(fs).delete(filePath, false); + } + + /** + * This test validates that a close operation can happen even while a 'closeAndGetHandle()' + * call is in progress. + * <p> + * <p>That behavior is essential for fast cancellation (concurrent cleanup). + */ + @Test + public void testCloseDoesNotLock() throws Exception { + final Path folder = new Path(tmp.newFolder().toURI()); + final String fileName = "this-is-ignored-anyways.file"; + + final FileSystem fileSystem = spy(new TestFs((path) -> new BlockerStream())); + + final FSDataOutputStream checkpointStream = + createTestStream(fileSystem, folder, fileName); + + final OneShotLatch sync = new OneShotLatch(); + + final CheckedThread thread = new CheckedThread() { + + @Override + public void go() throws Exception { + sync.trigger(); + // that call should now block, because it accesses the position + closeAndGetResult(checkpointStream); + } + }; + thread.start(); + + sync.await(); + checkpointStream.close(); + + // the thread may or may not fail, that depends on the thread race + // it is not important for this test, important is that the thread does not freeze/lock up + try { + thread.sync(); + } catch (IOException ignored) {} + } + + /** + * Creates a new test stream instance. + */ + protected abstract FSDataOutputStream createTestStream( + FileSystem fs, + Path dir, + String fileName) throws IOException; + + /** + * Closes the stream successfully and returns a FileStateHandle to the result. + */ + protected abstract FileStateHandle closeAndGetResult(FSDataOutputStream stream) throws IOException; + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private Path baseFolder() throws Exception { + return new Path(new File(tmp.newFolder(), UUID.randomUUID().toString()).toURI()); + } + + private static void readFully(InputStream in, byte[] buffer) throws IOException { + int pos = 0; + int remaining = buffer.length; + + while (remaining > 0) { + int read = in.read(buffer, pos, remaining); + if (read == -1) { + throw new EOFException(); + } + + pos += read; + remaining -= read; + } + } + + private static class BlockerStream extends FSDataOutputStream { + + private final OneShotLatch blocker = new OneShotLatch(); + + @Override + public long getPos() throws IOException { + block(); + return 0L; + } + + @Override + public void write(int b) throws IOException { + block(); + } + + @Override + public void flush() throws IOException { + block(); + } + + @Override + public void sync() throws IOException { + block(); + } + + @Override + public void close() throws IOException { + blocker.trigger(); + } + + private void block() throws IOException { + try { + blocker.await(); + } catch (InterruptedException e) { + throw new IOException("interrupted"); + } + throw new IOException("closed"); + } + } + + // ------------------------------------------------------------------------ + + private static class FailingCloseStream extends LocalDataOutputStream { + + FailingCloseStream(File file) throws IOException { + super(file); + } + + @Override + public void close() throws IOException { + throw new IOException(); + } + } + + private static class TestFs extends LocalFileSystem { + + private final FunctionWithException<Path, FSDataOutputStream, IOException> streamFactory; + + TestFs(FunctionWithException<Path, FSDataOutputStream, IOException> streamFactory) { + this.streamFactory = streamFactory; + } + + @Override + public FSDataOutputStream create(Path filePath, WriteMode overwrite) throws IOException { + return streamFactory.apply(filePath); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStreamTest.java new file mode 100644 index 0000000..9ee59d2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStreamTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import java.io.IOException; + +/** + * Tests for the {@link FileBasedStateOutputStream}. + */ +public class FileBasedStateOutputStreamTest extends AbstractCheckpointStateOutputStreamTestBase { + + @Override + protected FSDataOutputStream createTestStream(FileSystem fs, Path dir, String fileName) throws IOException { + return new FileBasedStateOutputStream(fs, new Path(dir, fileName)); + } + + @Override + protected FileStateHandle closeAndGetResult(FSDataOutputStream stream) throws IOException { + return ((FileBasedStateOutputStream) stream).closeAndGetHandle(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStreamTest.java index 5a15b4b..4803e93 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStreamTest.java @@ -18,291 +18,25 @@ package org.apache.flink.runtime.state.filesystem; -import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.core.fs.local.LocalDataOutputStream; -import org.apache.flink.core.fs.local.LocalFileSystem; -import org.apache.flink.core.testutils.CheckedThread; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.util.function.FunctionWithException; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.EOFException; -import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.util.Random; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; /** * Tests for the {@link FsCheckpointMetadataOutputStream}. */ -public class FsCheckpointMetadataOutputStreamTest { - - @Rule - public final TemporaryFolder tmp = new TemporaryFolder(); - - // ------------------------------------------------------------------------ - // Tests - // ------------------------------------------------------------------------ - - /** - * Validates that even empty streams create a file and a file state handle. - */ - @Test - public void testEmptyState() throws Exception { - final FileSystem fs = FileSystem.getLocalFileSystem(); - - final Path checkpointDir = Path.fromLocalFile(tmp.newFolder()); - final Path metadataPath = new Path(checkpointDir, "myFileName"); - - final FsCompletedCheckpointStorageLocation location; - try (FsCheckpointMetadataOutputStream stream = new FsCheckpointMetadataOutputStream(fs, metadataPath, checkpointDir)) { - location = stream.closeAndFinalizeCheckpoint(); - } - - // must have created a handle - assertNotNull(location); - assertNotNull(location.getMetadataHandle()); - assertEquals(metadataPath, location.getMetadataHandle().getFilePath()); - - // the pointer path should exist as a file - assertTrue(fs.exists(metadataPath)); - assertFalse(fs.getFileStatus(metadataPath).isDir()); - - // the contents should be empty - try (FSDataInputStream in = location.getMetadataHandle().openInputStream()) { - assertEquals(-1, in.read()); - } - } - - /** - * Simple write and read test. - */ - @Test - public void testWriteAndRead() throws Exception { - final FileSystem fs = FileSystem.getLocalFileSystem(); - - final Path checkpointDir = Path.fromLocalFile(tmp.newFolder()); - final Path metadataPath = new Path(checkpointDir, "fooBarName"); - - final Random rnd = new Random(); - final byte[] data = new byte[1694523]; - - // write the data (mixed single byte writes and array writes) - final FsCompletedCheckpointStorageLocation completed; - try (FsCheckpointMetadataOutputStream stream = new FsCheckpointMetadataOutputStream(fs, metadataPath, checkpointDir)) { - for (int i = 0; i < data.length;) { - if (rnd.nextBoolean()) { - stream.write(data[i++]); - } - else { - int len = rnd.nextInt(Math.min(data.length - i, 32)); - stream.write(data, i, len); - i += len; - } - } - completed = stream.closeAndFinalizeCheckpoint(); - } - - // (1) stream from handle must hold the contents - try (FSDataInputStream in = completed.getMetadataHandle().openInputStream()) { - byte[] buffer = new byte[data.length]; - readFully(in, buffer); - assertArrayEquals(data, buffer); - } - - // (2) the pointer must point to a file with that contents - try (FSDataInputStream in = fs.open(completed.getMetadataHandle().getFilePath())) { - byte[] buffer = new byte[data.length]; - readFully(in, buffer); - assertArrayEquals(data, buffer); - } - } - - /** - * Tests that the underlying stream file is deleted upon calling close. - */ - @Test - public void testCleanupWhenClosingStream() throws IOException { - final FileSystem fs = FileSystem.getLocalFileSystem(); - - final Path checkpointDir = Path.fromLocalFile(tmp.newFolder()); - final Path metadataPath = new Path(checkpointDir, "nonCreativeTestFileName"); - - // write some test data and close the stream - try (FsCheckpointMetadataOutputStream stream = new FsCheckpointMetadataOutputStream(fs, metadataPath, checkpointDir)) { - Random rnd = new Random(); - for (int i = 0; i < rnd.nextInt(1000); i++) { - stream.write(rnd.nextInt(100)); - } - assertTrue(fs.exists(metadataPath)); - } - - assertFalse(fs.exists(metadataPath)); - assertTrue(fs.exists(checkpointDir)); - } - - /** - * Tests that the underlying stream file is deleted if the closeAndGetHandle method fails. - */ - @Test - public void testCleanupWhenFailingCloseAndGetHandle() throws IOException { - final Path checkpointDir = Path.fromLocalFile(tmp.newFolder()); - final Path metadataPath = new Path(checkpointDir, "test_name"); - - final FileSystem fs = spy(new TestFs((path) -> new FailingCloseStream(new File(path.getPath())))); - - FsCheckpointMetadataOutputStream stream = new FsCheckpointMetadataOutputStream(fs, metadataPath, checkpointDir); - stream.write(new byte[] {1, 2, 3, 4, 5}); - - try { - stream.closeAndFinalizeCheckpoint(); - fail("Expected IOException"); - } - catch (IOException ignored) { - // expected exception - } - - verify(fs).delete(metadataPath, false); - } +public class FsCheckpointMetadataOutputStreamTest extends AbstractCheckpointStateOutputStreamTestBase { - /** - * This test validates that a close operation can happen even while a 'closeAndGetHandle()' - * call is in progress. - * - * <p>That behavior is essential for fast cancellation (concurrent cleanup). - */ - @Test - public void testCloseDoesNotLock() throws Exception { - final Path checkpointDir = Path.fromLocalFile(tmp.newFolder()); - final Path metadataPath = new Path(checkpointDir, "this-is-ignored-anyways.file"); - - final FileSystem fileSystem = spy(new TestFs((path) -> new BlockerStream())); - - final FsCheckpointMetadataOutputStream checkpointStream = - new FsCheckpointMetadataOutputStream(fileSystem, metadataPath, checkpointDir); - - final OneShotLatch sync = new OneShotLatch(); - - final CheckedThread thread = new CheckedThread() { - - @Override - public void go() throws Exception { - sync.trigger(); - // that call should now block, because it accesses the position - checkpointStream.closeAndFinalizeCheckpoint(); - } - }; - thread.start(); - - sync.await(); - checkpointStream.close(); - - // the thread may or may not fail, that depends on the thread race - // it is not important for this test, important is that the thread does not freeze/lock up - try { - thread.sync(); - } catch (IOException ignored) {} + @Override + protected FSDataOutputStream createTestStream(FileSystem fs, Path dir, String fileName) throws IOException { + Path fullPath = new Path(dir, fileName); + return new FsCheckpointMetadataOutputStream(fs, fullPath, dir); } - // ------------------------------------------------------------------------ - // utilities - // ------------------------------------------------------------------------ - - private static void readFully(InputStream in, byte[] buffer) throws IOException { - int pos = 0; - int remaining = buffer.length; - - while (remaining > 0) { - int read = in.read(buffer, pos, remaining); - if (read == -1) { - throw new EOFException(); - } - - pos += read; - remaining -= read; - } - } - - private static class BlockerStream extends FSDataOutputStream { - - private final OneShotLatch blocker = new OneShotLatch(); - - @Override - public long getPos() throws IOException { - block(); - return 0L; - } - - @Override - public void write(int b) throws IOException { - block(); - } - - @Override - public void flush() throws IOException { - block(); - } - - @Override - public void sync() throws IOException { - block(); - } - - @Override - public void close() throws IOException { - blocker.trigger(); - } - - private void block() throws IOException { - try { - blocker.await(); - } catch (InterruptedException e) { - throw new IOException("interrupted"); - } - throw new IOException("closed"); - } - } - - // ------------------------------------------------------------------------ - - private static class FailingCloseStream extends LocalDataOutputStream { - - FailingCloseStream(File file) throws IOException { - super(file); - } - - @Override - public void close() throws IOException { - throw new IOException(); - } - } - - private static class TestFs extends LocalFileSystem { - - private final FunctionWithException<Path, FSDataOutputStream, IOException> streamFactory; - - TestFs(FunctionWithException<Path, FSDataOutputStream, IOException> streamFactory) { - this.streamFactory = streamFactory; - } - - @Override - public FSDataOutputStream create(Path filePath, WriteMode overwrite) throws IOException { - return streamFactory.apply(filePath); - } + @Override + protected FileStateHandle closeAndGetResult(FSDataOutputStream stream) throws IOException { + return ((FsCheckpointMetadataOutputStream) stream).closeAndFinalizeCheckpoint().getMetadataHandle(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java index 3ac3b6b..d83aa9f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java @@ -21,11 +21,12 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; + import org.junit.Test; import java.io.BufferedInputStream; @@ -64,7 +65,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) { stateHandle = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader()); } - keyedBackend.restore(Collections.<KeyedStateHandle>singleton(stateHandle)); + keyedBackend.restore(StateObjectCollection.singleton(stateHandle)); final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class); stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java index b10c2c0..bf428dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; + import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -54,6 +56,7 @@ public abstract class HeapStateBackendTestBase { 16, new KeyGroupRange(0, 15), async, - new ExecutionConfig()); + new ExecutionConfig(), + TestLocalRecoveryConfig.disabled()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java index b40f179..d958fcc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java @@ -68,7 +68,7 @@ public class BackendForTestStream extends MemoryStateBackend { // ------------------------------------------------------------------------ public interface StreamFactory - extends SupplierWithException<CheckpointStateOutputStream, Exception>, java.io.Serializable {} + extends SupplierWithException<CheckpointStateOutputStream, IOException>, java.io.Serializable {} // ------------------------------------------------------------------------ @@ -119,7 +119,7 @@ public class BackendForTestStream extends MemoryStateBackend { } @Override - public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws Exception { + public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException { return streamFactory.get(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java index bc6f238..8dcd642 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.testutils.category.OldAndFlip6; import org.apache.flink.util.TestLogger; @@ -100,6 +101,8 @@ public class NetworkBufferCalculationTest extends TestLogger { return new TaskManagerServicesConfiguration( InetAddress.getLoopbackAddress(), new String[] {}, + new String[] {}, + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, networkConfig, QueryableStateConfiguration.disabled(), 1, http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 8f4ec5d..d693f47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -50,6 +50,8 @@ import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -64,6 +66,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import java.io.File; import java.net.InetAddress; import java.util.Arrays; import java.util.Collection; @@ -131,6 +134,14 @@ public class TaskExecutorITCase extends TestLogger { TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); + final File[] taskExecutorLocalStateRootDirs = + new File[]{new File(System.getProperty("java.io.tmpdir"), "localRecovery")}; + + final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + taskExecutorLocalStateRootDirs, + rpcService.getExecutor()); + ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager( rpcService, FlinkResourceManager.RESOURCE_MANAGER_NAME, @@ -147,6 +158,7 @@ public class TaskExecutorITCase extends TestLogger { final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setTaskSlotTable(taskSlotTable) + .setTaskStateManager(taskStateManager) .build(); TaskExecutor taskExecutor = new TaskExecutor( http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index d7a1860..e972feb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -74,6 +75,8 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -95,6 +98,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.Matchers; @@ -103,6 +107,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -137,6 +142,9 @@ import static org.mockito.Mockito.when; @Category(Flip6.class) public class TaskExecutorTest extends TestLogger { + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + private static final Time timeout = Time.milliseconds(10000L); private TestingRpcService rpc; @@ -227,10 +235,16 @@ public class TaskExecutorTest extends TestLogger { final SimpleJobMasterGateway jobMasterGateway = new SimpleJobMasterGateway( CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId))); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setTaskSlotTable(taskSlotTable) .setJobLeaderService(jobLeaderService) + .setTaskStateManager(localStateStoresManager) .build(); final TaskExecutor taskManager = new TaskExecutor( @@ -310,9 +324,15 @@ public class TaskExecutorTest extends TestLogger { HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setTaskSlotTable(taskSlotTable) + .setTaskStateManager(localStateStoresManager) .build(); final TaskExecutor taskManager = new TaskExecutor( @@ -439,9 +459,15 @@ public class TaskExecutorTest extends TestLogger { } ); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setTaskSlotTable(taskSlotTable) + .setTaskStateManager(localStateStoresManager) .build(); final TaskExecutor taskManager = new TaskExecutor( @@ -518,9 +544,15 @@ public class TaskExecutorTest extends TestLogger { final SlotReport slotReport = new SlotReport(); when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setTaskSlotTable(taskSlotTable) + .setTaskStateManager(localStateStoresManager) .build(); TaskExecutor taskManager = new TaskExecutor( @@ -575,9 +607,15 @@ public class TaskExecutorTest extends TestLogger { final SlotReport slotReport = new SlotReport(); when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setTaskSlotTable(taskSlotTable) + .setTaskStateManager(localStateStoresManager) .build(); TaskExecutor taskManager = new TaskExecutor( @@ -690,10 +728,16 @@ public class TaskExecutorTest extends TestLogger { when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class)); when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setNetworkEnvironment(networkEnvironment) .setTaskSlotTable(taskSlotTable) .setJobManagerTable(jobManagerTable) + .setTaskStateManager(localStateStoresManager) .build(); TaskExecutor taskManager = new TaskExecutor( @@ -787,11 +831,17 @@ public class TaskExecutorTest extends TestLogger { final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0); final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setTaskSlotTable(taskSlotTable) .setJobManagerTable(jobManagerTable) .setJobLeaderService(jobLeaderService) + .setTaskStateManager(localStateStoresManager) .build(); TaskExecutor taskManager = new TaskExecutor( @@ -891,11 +941,17 @@ public class TaskExecutorTest extends TestLogger { rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); rpc.registerGateway(jobManagerAddress, jobMasterGateway); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setTaskSlotTable(taskSlotTable) .setJobManagerTable(jobManagerTable) .setJobLeaderService(jobLeaderService) + .setTaskStateManager(localStateStoresManager) .build(); TaskExecutor taskManager = new TaskExecutor( @@ -1017,12 +1073,18 @@ public class TaskExecutorTest extends TestLogger { final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setNetworkEnvironment(networkMock) .setTaskSlotTable(taskSlotTable) .setJobLeaderService(jobLeaderService) .setJobManagerTable(jobManagerTable) + .setTaskStateManager(localStateStoresManager) .build(); final TaskExecutor taskManager = new TaskExecutor( @@ -1130,10 +1192,16 @@ public class TaskExecutorTest extends TestLogger { final JMTMRegistrationSuccess registrationMessage = new JMTMRegistrationSuccess(ResourceID.generate()); final JobManagerTable jobManagerTableMock = spy(new JobManagerTable()); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setJobManagerTable(jobManagerTableMock) .setJobLeaderService(jobLeaderService) + .setTaskStateManager(localStateStoresManager) .build(); final TaskExecutor taskExecutor = new TaskExecutor( @@ -1198,9 +1266,15 @@ public class TaskExecutorTest extends TestLogger { rpc.registerGateway(rmAddress, rmGateway); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setTaskSlotTable(taskSlotTable) + .setTaskStateManager(localStateStoresManager) .build(); final TaskExecutor taskExecutor = new TaskExecutor( @@ -1248,9 +1322,15 @@ public class TaskExecutorTest extends TestLogger { Collections.singleton(ResourceProfile.UNKNOWN), timerService); + TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setTaskSlotTable(taskSlotTable) + .setTaskStateManager(localStateStoresManager) .build(); final TaskExecutor taskExecutor = new TaskExecutor( http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java index 2bd39f8..3af9a46 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java @@ -63,8 +63,7 @@ public class TaskManagerServicesBuilder { taskSlotTable = mock(TaskSlotTable.class); jobManagerTable = new JobManagerTable(); jobLeaderService = new JobLeaderService(taskManagerLocation); - taskStateManager = new TaskExecutorLocalStateStoresManager(); - + taskStateManager = mock(TaskExecutorLocalStateStoresManager.class); } public TaskManagerServicesBuilder setTaskManagerLocation(TaskManagerLocation taskManagerLocation) { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index e8ecc56..581d8ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; @@ -45,6 +46,8 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; @@ -158,6 +161,11 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { network.start(); + TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + ioManager.getSpillingDirectories(), + Executors.directExecutor()); + MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config); // create the task manager @@ -169,6 +177,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { memManager, ioManager, network, + storesManager, numberOfSlots, highAvailabilityServices, new TaskManagerMetricGroup(NoOpMetricRegistry.INSTANCE, connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString())); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java index 107826f..fce9620 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java @@ -18,15 +18,13 @@ package org.apache.flink.runtime.taskmanager; -import static org.junit.Assert.*; - -import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; @@ -36,6 +34,8 @@ import org.apache.flink.runtime.util.StartupUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; +import akka.actor.ActorSystem; +import org.apache.commons.io.FileUtils; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -43,16 +43,19 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import scala.Option; - import java.io.File; import java.io.IOException; -import java.net.InetAddress; import java.net.BindException; +import java.net.InetAddress; import java.net.ServerSocket; import java.util.ArrayList; import java.util.List; +import scala.Option; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Tests that check how the TaskManager behaves when encountering startup * problems. @@ -246,11 +249,11 @@ public class TaskManagerStartupTest extends TestLogger { cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost"); cfg.setInteger(TaskManagerOptions.DATA_PORT, blocker.getLocalPort()); cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 1L); - + ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(cfg); TaskManager.startTaskManagerComponentsAndActor( cfg, ResourceID.generate(), - null, + actorSystem, highAvailabilityServices, NoOpMetricRegistry.INSTANCE, "localhost", http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java index bc9b04f..ce41b10 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java @@ -39,9 +39,9 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { protected volatile OneShotLatch blocker; protected volatile OneShotLatch waiter; - MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream; + BlockingCheckpointOutputStream lastCreatedStream; - public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() { + public BlockingCheckpointOutputStream getLastCreatedStream() { return lastCreatedStream; } @@ -70,80 +70,13 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { } @Override - public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws Exception { - this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) { - - private int afterNInvocations = afterNumberInvocations; - private final OneShotLatch streamBlocker = blocker; - private final OneShotLatch streamWaiter = waiter; - - @Override - public void write(int b) throws IOException { - - unblockWaiter(); - - if (afterNInvocations > 0) { - --afterNInvocations; - } else { - awaitBlocker(); - } - - try { - super.write(b); - } catch (IOException ex) { - unblockWaiter(); - throw ex; - } - - if (0 == afterNInvocations) { - unblockWaiter(); - } - - // We also check for close here, in case the underlying stream does not do this - if (isClosed()) { - throw new IOException("Stream closed."); - } - } - - //We override this to ensure that writes go through the blocking #write(int) method! - @Override - public void write(byte[] b, int off, int len) throws IOException { - for (int i = 0; i < len; i++) { - write(b[off + i]); - } - } - - @Override - public void close() { - super.close(); - // trigger all the latches, essentially all blocking ops on the stream should resume after close. - unblockAll(); - } - - private void unblockWaiter() { - if (null != streamWaiter) { - streamWaiter.trigger(); - } - } - - private void awaitBlocker() { - if (null != streamBlocker) { - try { - streamBlocker.await(); - } catch (InterruptedException ignored) { - } - } - } - - private void unblockAll() { - if (null != streamWaiter) { - streamWaiter.trigger(); - } - if (null != streamBlocker) { - streamBlocker.trigger(); - } - } - }; + public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException { + + this.lastCreatedStream = new BlockingCheckpointOutputStream( + new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize), + waiter, + blocker, + afterNumberInvocations); return lastCreatedStream; } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java new file mode 100644 index 0000000..fd8e59d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.IOUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + /** + * This test utility class provides a CheckpointStateOutputStream (which is also a FSDataOutputStream) that can block + * on a latch and takes a latch that the creator can block on until the stream is closed. This is typically used to + * test that a blocking read can be interrupted / closed. + */ +public class BlockingCheckpointOutputStream extends CheckpointStreamFactory.CheckpointStateOutputStream { + + /** Optional delegate stream to which all ops are forwarded. */ + private final FSDataOutputStream delegate; + + /** Optional latch on which the stream blocks, e.g. until the test triggers it after some call to #close(). */ + private final OneShotLatch triggerUnblock; + + /** Optional latch on which the test can block until the stream is blocked at the desired blocking position. */ + private final OneShotLatch waitForBlocking; + + /** Closed flag. */ + private final AtomicBoolean closed; + + /** The read position at which this will block. 0 by default. */ + private final long blockAtPosition; + + /** The current read position. */ + private long position; + + public BlockingCheckpointOutputStream( + @Nullable OneShotLatch waitForBlocking, + @Nullable OneShotLatch triggerUnblock) { + this(null, waitForBlocking, triggerUnblock, 0L); + } + + public BlockingCheckpointOutputStream( + @Nullable FSDataOutputStream delegate, + @Nullable OneShotLatch waitForBlock, + @Nullable OneShotLatch triggerUnblock) { + this(delegate, waitForBlock, triggerUnblock, 0L); + } + + public BlockingCheckpointOutputStream( + @Nullable FSDataOutputStream delegate, + @Nullable OneShotLatch waitForBlocking, + @Nullable OneShotLatch triggerUnblock, + long blockAtPosition) { + + this.delegate = delegate; + this.triggerUnblock = triggerUnblock; + this.waitForBlocking = waitForBlocking; + this.blockAtPosition = blockAtPosition; + if (delegate != null) { + try { + this.position = delegate.getPos(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + this.position = 0; + } + this.closed = new AtomicBoolean(false); + } + + @Override + public void write(int b) throws IOException { + + if (position == blockAtPosition) { + unblockWaiter(); + awaitUnblocker(); + } + + if (delegate != null) { + try { + delegate.write(b); + } catch (IOException ex) { + unblockWaiter(); + throw ex; + } + } + + // We also check for close here, in case the underlying stream does not do this + if (closed.get()) { + throw new IOException("Stream closed."); + } + + ++position; + } + + //We override this to ensure that writes go through the blocking #write(int) method! + @Override + public void write(byte[] b, int off, int len) throws IOException { + for (int i = 0; i < len; i++) { + write(b[off + i]); + } + } + + @Override + public long getPos() throws IOException { + return position; + } + + @Override + public void flush() throws IOException { + if (delegate != null) { + delegate.flush(); + } + } + + @Override + public void sync() throws IOException { + if (delegate != null) { + delegate.sync(); + } + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + if (delegate != null) { + IOUtils.closeQuietly(delegate); + } + // trigger all the latches, essentially all blocking ops on the stream should resume after close. + unblockAll(); + } + } + + private void unblockWaiter() { + if (null != waitForBlocking) { + waitForBlocking.trigger(); + } + } + + private void awaitUnblocker() { + if (null != triggerUnblock) { + try { + triggerUnblock.await(); + } catch (InterruptedException ignored) { + } + } + } + + private void unblockAll() { + if (null != waitForBlocking) { + waitForBlocking.trigger(); + } + if (null != triggerUnblock) { + triggerUnblock.trigger(); + } + } + + @Nullable + @Override + public StreamStateHandle closeAndGetHandle() throws IOException { + + if (!closed.compareAndSet(false, true)) { + throw new IOException("Stream was already closed!"); + } + + if (delegate instanceof CheckpointStreamFactory.CheckpointStateOutputStream) { + StreamStateHandle streamStateHandle = + ((CheckpointStreamFactory.CheckpointStateOutputStream) delegate).closeAndGetHandle(); + unblockAll(); + return streamStateHandle; + } else { + unblockAll(); + throw new IOException("Delegate is not a CheckpointStateOutputStream!"); + } + } + + public boolean isClosed() { + return closed.get(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingFSDataInputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingFSDataInputStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingFSDataInputStream.java new file mode 100644 index 0000000..4721980 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingFSDataInputStream.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.util.IOUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This test utility class provides a {@link FSDataInputStream} that can block on a latch and takes a latch that + * the creator can block on until the stream is closed. This is typically used to test that a blocking read can be + * interrupted / closed. + */ +public class BlockingFSDataInputStream extends FSDataInputStream { + + /** Optional delegate stream to which all ops are forwarded. */ + private final FSDataInputStream delegate; + + /** Optional latch on which the stream blocks, e.g. until the test triggers it after some call to #close(). */ + private final OneShotLatch triggerUnblock; + + /** Optional latch on which the test can block until the stream is blocked at the desired blocking position. */ + private final OneShotLatch waitUntilStreamBlocked; + + /** Closed flag. */ + private final AtomicBoolean closed; + + /** The read position at which this will block. 0 by default. */ + private final long blockAtPosition; + + /** The current read position. */ + private long position; + + public BlockingFSDataInputStream( + @Nullable OneShotLatch waitForBlock, + @Nullable OneShotLatch triggerUnblock) { + this(null, waitForBlock, triggerUnblock, 0L); + } + + public BlockingFSDataInputStream( + @Nullable FSDataInputStream delegate, + @Nullable OneShotLatch waitForBlock, + @Nullable OneShotLatch triggerUnblock) { + this(delegate, waitForBlock, triggerUnblock, 0L); + } + + public BlockingFSDataInputStream( + @Nullable FSDataInputStream delegate, + @Nullable OneShotLatch waitForBlock, + @Nullable OneShotLatch triggerUnblock, + long blockAtPosition) { + + this.delegate = delegate; + this.triggerUnblock = triggerUnblock; + this.waitUntilStreamBlocked = waitForBlock; + this.blockAtPosition = blockAtPosition; + if (delegate != null) { + try { + this.position = delegate.getPos(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + this.position = 0; + } + this.closed = new AtomicBoolean(false); + } + + @Override + public void seek(long desired) throws IOException { + if (delegate != null) { + delegate.seek(desired); + } + this.position = desired; + } + + @Override + public long getPos() throws IOException { + return position; + } + + @Override + public int read() throws IOException { + + if (position == blockAtPosition) { + unblockWaiter(); + awaitBlocker(); + } + + int val = 0; + if (delegate != null) { + try { + val = delegate.read(); + } catch (IOException ex) { + unblockWaiter(); + throw ex; + } + } + + // We also check for close here, in case the underlying stream does not do this + if (closed.get()) { + throw new IOException("Stream closed."); + } else { + ++position; + return val; + } + } + + + @Override + public int read(byte[] b, int off, int len) throws IOException { + // We override this to ensure that we use the blocking read method internally. + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + int c = read(); + if (c == -1) { + return -1; + } + b[off] = (byte) c; + + int i = 1; + try { + for (; i < len; i++) { + c = read(); + if (c == -1) { + break; + } + b[off + i] = (byte) c; + } + } catch (IOException ee) { + } + return i; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + if (delegate != null) { + IOUtils.closeQuietly(delegate); + } + // trigger all the latches, essentially all blocking ops on the stream should resume after close. + unblockAll(); + } + } + + private void unblockWaiter() { + if (null != waitUntilStreamBlocked) { + waitUntilStreamBlocked.trigger(); + } + } + + private void awaitBlocker() { + if (null != triggerUnblock) { + try { + triggerUnblock.await(); + } catch (InterruptedException ignored) { + } + } + } + + private void unblockAll() { + if (null != waitUntilStreamBlocked) { + waitUntilStreamBlocked.trigger(); + } + if (null != triggerUnblock) { + triggerUnblock.trigger(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java index fe293d7..cc0d4fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java @@ -56,8 +56,10 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.TaskLocalStateStore; +import org.apache.flink.runtime.state.TaskLocalStateStoreImpl; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TaskStateManagerImpl; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; @@ -68,8 +70,9 @@ import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.testutils.TestJvmProcess; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.SerializedValue; - +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -86,6 +89,9 @@ import static org.mockito.Mockito.when; */ public class JvmExitOnFatalErrorTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Test public void testExitJvmOnOutOfMemory() throws Exception { // this test works only on linux @@ -138,12 +144,13 @@ public class JvmExitOnFatalErrorTest { System.err.println("creating task"); // we suppress process exits via errors here to not - // have a test that exits accidentally due to a programming error + // have a test that exits accidentally due to a programming error try { final Configuration taskManagerConfig = new Configuration(); taskManagerConfig.setBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY, true); final JobID jid = new JobID(); + final AllocationID allocationID = new AllocationID(); final JobVertexID jobVertexId = new JobVertexID(); final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(); final AllocationID slotAllocationId = new AllocationID(); @@ -172,7 +179,15 @@ public class JvmExitOnFatalErrorTest { BlobCacheService blobService = new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class)); - final TaskLocalStateStore localStateStore = new TaskLocalStateStore(jid, jobVertexId, 0); + final TaskLocalStateStore localStateStore = + new TaskLocalStateStoreImpl( + jid, + allocationID, + jobVertexId, + 0, + TestLocalRecoveryConfig.disabled(), + executor); + final TaskStateManager slotStateManager = new TaskStateManagerImpl( jid, http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestByteStreamStateHandleDeepCompare.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestByteStreamStateHandleDeepCompare.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestByteStreamStateHandleDeepCompare.java deleted file mode 100644 index 7d8797b..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestByteStreamStateHandleDeepCompare.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; -import org.apache.flink.util.InstantiationUtil; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Arrays; - -public class TestByteStreamStateHandleDeepCompare extends ByteStreamStateHandle { - - private static final long serialVersionUID = -4946526195523509L; - - public TestByteStreamStateHandleDeepCompare(String handleName, byte[] data) { - super(handleName, data); - } - - @Override - public boolean equals(Object o) { - if (!super.equals(o)) { - return false; - } - ByteStreamStateHandle other = (ByteStreamStateHandle) o; - return Arrays.equals(getData(), other.getData()); - } - - @Override - public int hashCode() { - return 31 * super.hashCode() + Arrays.hashCode(getData()); - } - - public static StreamStateHandle fromSerializable(String handleName, Serializable value) throws IOException { - return new TestByteStreamStateHandleDeepCompare(handleName, InstantiationUtil.serializeObject(value)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index da753ae..5e60be9 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} @@ -38,6 +39,7 @@ class TestingTaskManager( memoryManager: MemoryManager, ioManager: IOManager, network: NetworkEnvironment, + taskManagerStateStore: TaskExecutorLocalStateStoresManager, numberOfSlots: Int, highAvailabilityServices: HighAvailabilityServices, taskManagerMetricGroup : TaskManagerMetricGroup) @@ -48,6 +50,7 @@ class TestingTaskManager( memoryManager, ioManager, network, + taskManagerStateStore, numberOfSlots, highAvailabilityServices, taskManagerMetricGroup) @@ -59,6 +62,7 @@ class TestingTaskManager( memoryManager: MemoryManager, ioManager: IOManager, network: NetworkEnvironment, + taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager, numberOfSlots: Int, highAvailabilityServices: HighAvailabilityServices, taskManagerMetricGroup : TaskManagerMetricGroup) { @@ -69,6 +73,7 @@ class TestingTaskManager( memoryManager, ioManager, network, + taskManagerLocalStateStoresManager, numberOfSlots, highAvailabilityServices, taskManagerMetricGroup) http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 4fc0283..344255f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -60,9 +60,11 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; @@ -212,6 +214,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** Unique ID of this backend. */ private UUID backendUID; + /** The configuration of local recovery. */ + private final LocalRecoveryConfig localRecoveryConfig; + public RocksDBKeyedStateBackend( String operatorIdentifier, ClassLoader userCodeClassLoader, @@ -223,7 +228,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, - boolean enableIncrementalCheckpointing + boolean enableIncrementalCheckpointing, + LocalRecoveryConfig localRecoveryConfig ) throws IOException { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); @@ -253,6 +259,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { String.format("Could not create RocksDB data directory at %s.", instanceBasePath.getAbsolutePath())); } + this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig); this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; this.kvStateInformation = new HashMap<>(); this.restoredKvStateMetaInfos = new HashMap<>(); @@ -365,10 +372,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * @param streamFactory The factory that we can use for writing our state to streams. * @param checkpointOptions Options for how to perform this checkpoint. * @return Future to the state handle of the snapshot data. - * @throws Exception + * @throws Exception indicating a problem in the synchronous part of the checkpoint. */ @Override - public RunnableFuture<KeyedStateHandle> snapshot( + public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot( final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, @@ -382,7 +389,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } - private RunnableFuture<KeyedStateHandle> snapshotIncrementally( + private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotIncrementally( final long checkpointId, final long checkpointTimestamp, final CheckpointStreamFactory checkpointStreamFactory) throws Exception { @@ -396,7 +403,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", checkpointTimestamp); } - return DoneFuture.nullValue(); + return DoneFuture.of(SnapshotResult.empty()); } final RocksDBIncrementalSnapshotOperation<K> snapshotOperation = @@ -414,11 +421,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { throw e; } - return new FutureTask<KeyedStateHandle>( - new Callable<KeyedStateHandle>() { + return new FutureTask<SnapshotResult<KeyedStateHandle>>( + new Callable<SnapshotResult<KeyedStateHandle>>() { @Override - public KeyedStateHandle call() throws Exception { - return snapshotOperation.materializeSnapshot(); + public SnapshotResult<KeyedStateHandle> call() throws Exception { + KeyedStateHandle keyedStateHandle = snapshotOperation.materializeSnapshot(); + return SnapshotResult.of(keyedStateHandle); } } ) { @@ -435,7 +443,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { }; } - private RunnableFuture<KeyedStateHandle> snapshotFully( + private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotFully( final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory) throws Exception { @@ -450,15 +458,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", timestamp); } - return DoneFuture.nullValue(); + return DoneFuture.of(SnapshotResult.empty()); } snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry); snapshotOperation.takeDBSnapShot(checkpointId, timestamp); // implementation of the async IO operation, based on FutureTask - AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable = - new AbstractAsyncCallableWithResources<KeyedStateHandle>() { + AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = + new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { @Override protected void acquireResources() throws Exception { @@ -493,7 +501,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } @Override - public KeyGroupsStateHandle performOperation() throws Exception { + public SnapshotResult<KeyedStateHandle> performOperation() throws Exception { long startTime = System.currentTimeMillis(); if (isStopped()) { @@ -505,7 +513,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); - return snapshotOperation.getSnapshotResultStateHandle(); + KeyGroupsStateHandle snapshotResultStateHandle = snapshotOperation.getSnapshotResultStateHandle(); + return SnapshotResult.of(snapshotResultStateHandle); } };
