http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java new file mode 100644 index 0000000..481fb98 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.FloatSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.types.StringValue; +import org.apache.flink.util.OperatingSystem; + +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Random; +import java.util.UUID; + +import static org.junit.Assert.*; + +public class FileStateBackendTest { + + @Test + public void testSetupAndSerialization() { + File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); + try { + final String backendDir = localFileUri(tempDir); + FsStateBackend originalBackend = new FsStateBackend(backendDir); + + assertFalse(originalBackend.isInitialized()); + assertEquals(new URI(backendDir), originalBackend.getBasePath().toUri()); + assertNull(originalBackend.getCheckpointDirectory()); + + // serialize / copy the backend + FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend); + assertFalse(backend.isInitialized()); + assertEquals(new URI(backendDir), backend.getBasePath().toUri()); + assertNull(backend.getCheckpointDirectory()); + + // no file operations should be possible right now + try { + backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis()); + fail("should fail with an exception"); + } catch (IllegalStateException e) { + // supreme! + } + + backend.initializeForJob(new JobID()); + assertNotNull(backend.getCheckpointDirectory()); + + File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); + assertTrue(checkpointDir.exists()); + assertTrue(isDirectoryEmpty(checkpointDir)); + + backend.disposeAllStateForCurrentJob(); + assertNull(backend.getCheckpointDirectory()); + + assertTrue(isDirectoryEmpty(tempDir)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + deleteDirectorySilently(tempDir); + } + } + + @Test + public void testSerializableState() { + File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); + try { + FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); + backend.initializeForJob(new JobID()); + + File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); + + String state1 = "dummy state"; + String state2 = "row row row your boat"; + Integer state3 = 42; + + StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis()); + StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis()); + StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis()); + + assertFalse(isDirectoryEmpty(checkpointDir)); + assertEquals(state1, handle1.getState(getClass().getClassLoader())); + handle1.discardState(); + + assertFalse(isDirectoryEmpty(checkpointDir)); + assertEquals(state2, handle2.getState(getClass().getClassLoader())); + handle2.discardState(); + + assertFalse(isDirectoryEmpty(checkpointDir)); + assertEquals(state3, handle3.getState(getClass().getClassLoader())); + handle3.discardState(); + + assertTrue(isDirectoryEmpty(checkpointDir)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + deleteDirectorySilently(tempDir); + } + } + + @Test + public void testStateOutputStream() { + File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); + try { + FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); + backend.initializeForJob(new JobID()); + + File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); + + byte[] state1 = new byte[1274673]; + byte[] state2 = new byte[1]; + byte[] state3 = new byte[0]; + byte[] state4 = new byte[177]; + + Random rnd = new Random(); + rnd.nextBytes(state1); + rnd.nextBytes(state2); + rnd.nextBytes(state3); + rnd.nextBytes(state4); + + long checkpointId = 97231523452L; + + FsStateBackend.FsCheckpointStateOutputStream stream1 = + backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + FsStateBackend.FsCheckpointStateOutputStream stream2 = + backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + FsStateBackend.FsCheckpointStateOutputStream stream3 = + backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + + stream1.write(state1); + stream2.write(state2); + stream3.write(state3); + + FileStreamStateHandle handle1 = stream1.closeAndGetHandle(); + FileStreamStateHandle handle2 = stream2.closeAndGetHandle(); + FileStreamStateHandle handle3 = stream3.closeAndGetHandle(); + + // use with try-with-resources + StreamStateHandle handle4; + try (StateBackend.CheckpointStateOutputStream stream4 = + backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) { + stream4.write(state4); + handle4 = stream4.closeAndGetHandle(); + } + + // close before accessing handle + StateBackend.CheckpointStateOutputStream stream5 = + backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + stream5.write(state4); + stream5.close(); + try { + stream5.closeAndGetHandle(); + fail(); + } catch (IOException e) { + // uh-huh + } + + validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1); + handle1.discardState(); + assertFalse(isDirectoryEmpty(checkpointDir)); + ensureLocalFileDeleted(handle1.getFilePath()); + + validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2); + handle2.discardState(); + assertFalse(isDirectoryEmpty(checkpointDir)); + ensureLocalFileDeleted(handle2.getFilePath()); + + validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3); + handle3.discardState(); + assertFalse(isDirectoryEmpty(checkpointDir)); + ensureLocalFileDeleted(handle3.getFilePath()); + + validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4); + handle4.discardState(); + assertTrue(isDirectoryEmpty(checkpointDir)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + deleteDirectorySilently(tempDir); + } + } + + @Test + public void testKeyValueState() { + File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); + try { + FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); + backend.initializeForJob(new JobID()); + + File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); + + KvState<Integer, String, FsStateBackend> kv = + backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); + + assertEquals(0, kv.size()); + + // some modifications to the state + kv.setCurrentKey(1); + assertNull(kv.value()); + kv.update("1"); + assertEquals(1, kv.size()); + kv.setCurrentKey(2); + assertNull(kv.value()); + kv.update("2"); + assertEquals(2, kv.size()); + kv.setCurrentKey(1); + assertEquals("1", kv.value()); + assertEquals(2, kv.size()); + + // draw a snapshot + KvStateSnapshot<Integer, String, FsStateBackend> snapshot1 = + kv.shapshot(682375462378L, System.currentTimeMillis()); + + // make some more modifications + kv.setCurrentKey(1); + kv.update("u1"); + kv.setCurrentKey(2); + kv.update("u2"); + kv.setCurrentKey(3); + kv.update("u3"); + + // draw another snapshot + KvStateSnapshot<Integer, String, FsStateBackend> snapshot2 = + kv.shapshot(682375462379L, System.currentTimeMillis()); + + // validate the original state + assertEquals(3, kv.size()); + kv.setCurrentKey(1); + assertEquals("u1", kv.value()); + kv.setCurrentKey(2); + assertEquals("u2", kv.value()); + kv.setCurrentKey(3); + assertEquals("u3", kv.value()); + + // restore the first snapshot and validate it + KvState<Integer, String, FsStateBackend> restored1 = snapshot1.restoreState(backend, + IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader()); + + assertEquals(2, restored1.size()); + restored1.setCurrentKey(1); + assertEquals("1", restored1.value()); + restored1.setCurrentKey(2); + assertEquals("2", restored1.value()); + + // restore the first snapshot and validate it + KvState<Integer, String, FsStateBackend> restored2 = snapshot2.restoreState(backend, + IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader()); + + assertEquals(3, restored2.size()); + restored2.setCurrentKey(1); + assertEquals("u1", restored2.value()); + restored2.setCurrentKey(2); + assertEquals("u2", restored2.value()); + restored2.setCurrentKey(3); + assertEquals("u3", restored2.value()); + + snapshot1.discardState(); + assertFalse(isDirectoryEmpty(checkpointDir)); + + snapshot2.discardState(); + assertTrue(isDirectoryEmpty(checkpointDir)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + deleteDirectorySilently(tempDir); + } + } + + @Test + public void testRestoreWithWrongSerializers() { + File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); + try { + FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); + backend.initializeForJob(new JobID()); + + File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); + + KvState<Integer, String, FsStateBackend> kv = + backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); + + kv.setCurrentKey(1); + kv.update("1"); + kv.setCurrentKey(2); + kv.update("2"); + + KvStateSnapshot<Integer, String, FsStateBackend> snapshot = + kv.shapshot(682375462378L, System.currentTimeMillis()); + + + @SuppressWarnings("unchecked") + TypeSerializer<Integer> fakeIntSerializer = + (TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE; + + @SuppressWarnings("unchecked") + TypeSerializer<String> fakeStringSerializer = + (TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class); + + try { + snapshot.restoreState(backend, fakeIntSerializer, + StringSerializer.INSTANCE, null, getClass().getClassLoader()); + fail("should recognize wrong serializers"); + } catch (IllegalArgumentException e) { + // expected + } catch (Exception e) { + fail("wrong exception"); + } + + try { + snapshot.restoreState(backend, IntSerializer.INSTANCE, + fakeStringSerializer, null, getClass().getClassLoader()); + fail("should recognize wrong serializers"); + } catch (IllegalArgumentException e) { + // expected + } catch (Exception e) { + fail("wrong exception"); + } + + try { + snapshot.restoreState(backend, fakeIntSerializer, + fakeStringSerializer, null, getClass().getClassLoader()); + fail("should recognize wrong serializers"); + } catch (IllegalArgumentException e) { + // expected + } catch (Exception e) { + fail("wrong exception"); + } + + snapshot.discardState(); + + assertTrue(isDirectoryEmpty(checkpointDir)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + deleteDirectorySilently(tempDir); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static void ensureLocalFileDeleted(Path path) { + URI uri = path.toUri(); + if ("file".equals(uri.getScheme())) { + File file = new File(uri.getPath()); + assertFalse("file not properly deleted", file.exists()); + } + else { + throw new IllegalArgumentException("not a local path"); + } + } + + private static void deleteDirectorySilently(File dir) { + try { + FileUtils.deleteDirectory(dir); + } + catch (IOException ignored) {} + } + + private static boolean isDirectoryEmpty(File directory) { + String[] nested = directory.list(); + return nested == null || nested.length == 0; + } + + private static String localFileUri(File path) { + return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath(); + } + + private static void validateBytesInStream(InputStream is, byte[] data) throws IOException { + byte[] holder = new byte[data.length]; + assertEquals("not enough data", holder.length, is.read(holder)); + assertEquals("too much data", -1, is.read()); + assertArrayEquals("wrong data", data, holder); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java new file mode 100644 index 0000000..5f95b33 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.FloatSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.types.StringValue; +import org.junit.Test; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; + +import static org.junit.Assert.*; + +/** + * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}. + */ +public class MemoryStateBackendTest { + + @Test + public void testSerializableState() { + try { + MemoryStateBackend backend = new MemoryStateBackend(); + + HashMap<String, Integer> state = new HashMap<>(); + state.put("hey there", 2); + state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); + + StateHandle<HashMap<String, Integer>> handle = backend.checkpointStateSerializable(state, 12, 459); + assertNotNull(handle); + + HashMap<String, Integer> restored = handle.getState(getClass().getClassLoader()); + assertEquals(state, restored); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testOversizedState() { + try { + MemoryStateBackend backend = new MemoryStateBackend(10); + + HashMap<String, Integer> state = new HashMap<>(); + state.put("hey there", 2); + state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); + + try { + backend.checkpointStateSerializable(state, 12, 459); + fail("this should cause an exception"); + } + catch (IOException e) { + // now darling, isn't that exactly what we wanted? + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testStateStream() { + try { + MemoryStateBackend backend = new MemoryStateBackend(); + + HashMap<String, Integer> state = new HashMap<>(); + state.put("hey there", 2); + state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); + + StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2); + ObjectOutputStream oos = new ObjectOutputStream(os); + oos.writeObject(state); + oos.flush(); + StreamStateHandle handle = os.closeAndGetHandle(); + + assertNotNull(handle); + + ObjectInputStream ois = new ObjectInputStream(handle.getState(getClass().getClassLoader())); + assertEquals(state, ois.readObject()); + assertTrue(ois.available() <= 0); + ois.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testOversizedStateStream() { + try { + MemoryStateBackend backend = new MemoryStateBackend(10); + + HashMap<String, Integer> state = new HashMap<>(); + state.put("hey there", 2); + state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); + + StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2); + ObjectOutputStream oos = new ObjectOutputStream(os); + + try { + oos.writeObject(state); + oos.flush(); + os.closeAndGetHandle(); + fail("this should cause an exception"); + } + catch (IOException e) { + // oh boy! what an exception! + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testKeyValueState() { + try { + MemoryStateBackend backend = new MemoryStateBackend(); + + KvState<Integer, String, MemoryStateBackend> kv = + backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); + + assertEquals(0, kv.size()); + + // some modifications to the state + kv.setCurrentKey(1); + assertNull(kv.value()); + kv.update("1"); + assertEquals(1, kv.size()); + kv.setCurrentKey(2); + assertNull(kv.value()); + kv.update("2"); + assertEquals(2, kv.size()); + kv.setCurrentKey(1); + assertEquals("1", kv.value()); + assertEquals(2, kv.size()); + + // draw a snapshot + KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot1 = + kv.shapshot(682375462378L, System.currentTimeMillis()); + + // make some more modifications + kv.setCurrentKey(1); + kv.update("u1"); + kv.setCurrentKey(2); + kv.update("u2"); + kv.setCurrentKey(3); + kv.update("u3"); + + // draw another snapshot + KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot2 = + kv.shapshot(682375462379L, System.currentTimeMillis()); + + // validate the original state + assertEquals(3, kv.size()); + kv.setCurrentKey(1); + assertEquals("u1", kv.value()); + kv.setCurrentKey(2); + assertEquals("u2", kv.value()); + kv.setCurrentKey(3); + assertEquals("u3", kv.value()); + + // restore the first snapshot and validate it + KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend, + IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader()); + + assertEquals(2, restored1.size()); + restored1.setCurrentKey(1); + assertEquals("1", restored1.value()); + restored1.setCurrentKey(2); + assertEquals("2", restored1.value()); + + // restore the first snapshot and validate it + KvState<Integer, String, MemoryStateBackend> restored2 = snapshot2.restoreState(backend, + IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader()); + + assertEquals(3, restored2.size()); + restored2.setCurrentKey(1); + assertEquals("u1", restored2.value()); + restored2.setCurrentKey(2); + assertEquals("u2", restored2.value()); + restored2.setCurrentKey(3); + assertEquals("u3", restored2.value()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testRestoreWithWrongSerializers() { + try { + MemoryStateBackend backend = new MemoryStateBackend(); + KvState<Integer, String, MemoryStateBackend> kv = + backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); + + kv.setCurrentKey(1); + kv.update("1"); + kv.setCurrentKey(2); + kv.update("2"); + + KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot = + kv.shapshot(682375462378L, System.currentTimeMillis()); + + + @SuppressWarnings("unchecked") + TypeSerializer<Integer> fakeIntSerializer = + (TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE; + + @SuppressWarnings("unchecked") + TypeSerializer<String> fakeStringSerializer = + (TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class); + + try { + snapshot.restoreState(backend, fakeIntSerializer, + StringSerializer.INSTANCE, null, getClass().getClassLoader()); + fail("should recognize wrong serializers"); + } catch (IllegalArgumentException e) { + // expected + } catch (Exception e) { + fail("wrong exception"); + } + + try { + snapshot.restoreState(backend, IntSerializer.INSTANCE, + fakeStringSerializer, null, getClass().getClassLoader()); + fail("should recognize wrong serializers"); + } catch (IllegalArgumentException e) { + // expected + } catch (Exception e) { + fail("wrong exception"); + } + + try { + snapshot.restoreState(backend, fakeIntSerializer, + fakeStringSerializer, null, getClass().getClassLoader()); + fail("should recognize wrong serializers"); + } catch (IllegalArgumentException e) { + // expected + } catch (Exception e) { + fail("wrong exception"); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java index d2e5b6a..a65ec01 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import static com.google.common.base.Preconditions.checkNotNull; @@ -79,7 +80,7 @@ public class ZooKeeperTestUtils { // File system state backend config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); - config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, fsStateHandlePath + "/checkpoints"); + config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints"); config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, fsStateHandlePath + "/recovery"); // Akka failure detection and execution retries http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java index f0130ec..788f70d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java @@ -23,7 +23,6 @@ import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.state.StateHandleProvider; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; import org.apache.zookeeper.CreateMode; @@ -83,11 +82,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { */ @Test public void testAdd() throws Exception { - // Setup - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); - - ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + LongStateStorage longStateStorage = new LongStateStorage(); + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>( + ZooKeeper.getClient(), longStateStorage); // Config final String pathInZooKeeper = "/testAdd"; @@ -98,8 +95,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Verify // State handle created - assertEquals(1, stateHandleProvider.getStateHandles().size()); - assertEquals(state, stateHandleProvider.getStateHandles().get(0).getState(null)); + assertEquals(1, store.getAll().size()); + assertEquals(state, store.get(pathInZooKeeper).getState(null)); // Path created and is persistent Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); @@ -120,10 +117,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { */ @Test public void testAddWithCreateMode() throws Exception { - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); - - ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + LongStateStorage longStateStorage = new LongStateStorage(); + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>( + ZooKeeper.getClient(), longStateStorage); // Config Long state = 3457347234L; @@ -151,8 +147,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Verify // State handle created - assertEquals(i + 1, stateHandleProvider.getStateHandles().size()); - assertEquals(state, stateHandleProvider.getStateHandles().get(i).getState(null)); + assertEquals(i + 1, store.getAll().size()); + assertEquals(state, longStateStorage.getStateHandles().get(i).getState(null)); // Path created Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); @@ -182,7 +178,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { */ @Test(expected = Exception.class) public void testAddAlreadyExistingPath() throws Exception { - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider); @@ -198,7 +194,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { @Test public void testAddDiscardStateHandleAfterFailure() throws Exception { // Setup - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); CuratorFramework client = spy(ZooKeeper.getClient()); when(client.create()).thenThrow(new RuntimeException("Expected test Exception.")); @@ -231,7 +227,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { @Test public void testReplace() throws Exception { // Setup - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider); @@ -270,10 +266,10 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { */ @Test(expected = Exception.class) public void testReplaceNonExistingPath() throws Exception { - StateHandleProvider<Long> stateHandleProvider = new LongStateHandleProvider(); + StateStorageHelper<Long> stateStorage = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider); + ZooKeeper.getClient(), stateStorage); store.replace("/testReplaceNonExistingPath", 0, 1L); } @@ -284,7 +280,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { @Test public void testReplaceDiscardStateHandleAfterFailure() throws Exception { // Setup - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); CuratorFramework client = spy(ZooKeeper.getClient()); when(client.setData()).thenThrow(new RuntimeException("Expected test Exception.")); @@ -329,7 +325,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { @Test public void testGetAndExists() throws Exception { // Setup - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider); @@ -354,7 +350,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { */ @Test(expected = Exception.class) public void testGetNonExistingPath() throws Exception { - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider); @@ -368,7 +364,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { @Test public void testGetAll() throws Exception { // Setup - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider); @@ -399,7 +395,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { @Test public void testGetAllSortedByName() throws Exception { // Setup - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider); @@ -429,7 +425,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { @Test public void testRemove() throws Exception { // Setup - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider); @@ -453,7 +449,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { @Test public void testRemoveWithCallback() throws Exception { // Setup - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider); @@ -492,7 +488,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { @Test public void testRemoveAndDiscardState() throws Exception { // Setup - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider); @@ -514,7 +510,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { @Test public void testRemoveAndDiscardAllState() throws Exception { // Setup - LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider); @@ -543,21 +539,19 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Simple test helpers // --------------------------------------------------------------------------------------------- - private static class LongStateHandleProvider implements StateHandleProvider<Long> { - - private static final long serialVersionUID = 4572084854499402276L; + private static class LongStateStorage implements StateStorageHelper<Long> { private final List<LongStateHandle> stateHandles = new ArrayList<>(); @Override - public StateHandle<Long> createStateHandle(Long state) { + public StateHandle<Long> store(Long state) throws Exception { LongStateHandle stateHandle = new LongStateHandle(state); stateHandles.add(stateHandle); return stateHandle; } - public List<LongStateHandle> getStateHandles() { + List<LongStateHandle> getStateHandles() { return stateHandles; } } http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties index 1ca02aa..f77ed07 100644 --- a/flink-runtime/src/test/resources/log4j-test.properties +++ b/flink-runtime/src/test/resources/log4j-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=INFO, console +log4j.rootLogger=OFF, console # ----------------------------------------------------------------------------- # Console (use 'console') @@ -36,3 +36,4 @@ log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m # suppress the irrelevant (wrong) warnings from the netty channel handler log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console +log4j.logger.org.apache.flink.runtime.blob=DEBUG http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java index 8b7fb1c..4e4acd2 100644 --- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java +++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java @@ -27,11 +27,11 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle; -import org.apache.flink.streaming.api.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.state.StateBackend; -import org.apache.flink.streaming.api.state.StreamStateHandle; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 26e1c9e..98506e0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -63,7 +63,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.state.StateBackend; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.types.StringValue; import org.apache.flink.util.SplittableIterator; @@ -372,11 +372,11 @@ public abstract class StreamExecutionEnvironment { * the key/value state, and for checkpointed functions (implementing the interface * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}). * - * <p>The {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend} for example + * <p>The {@link org.apache.flink.runtime.state.memory.MemoryStateBackend} for example * maintains the state in heap memory, as objects. It is lightweight without extra dependencies, * but can checkpoint only small states (some counters). * - * <p>In contrast, the {@link org.apache.flink.streaming.api.state.filesystem.FsStateBackend} + * <p>In contrast, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend} * stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated * file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee that state is not lost upon * failures of individual nodes and that streaming program can be executed highly available and strongly http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java index 3817ede..3ac63af 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java @@ -26,8 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.streaming.api.state.SerializedCheckpointData; +import org.apache.flink.runtime.state.SerializedCheckpointData; import java.util.ArrayDeque; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 76be598..11bf84f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.util.ClassLoaderUtil; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.state.StateBackend; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.runtime.tasks.StreamTaskException; import org.apache.flink.util.InstantiationUtil; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 0652406..be020d7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -50,7 +50,7 @@ import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.api.state.StateBackend; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 9e60e9a..078679d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -24,9 +24,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.state.KvState; -import org.apache.flink.streaming.api.state.KvStateSnapshot; -import org.apache.flink.streaming.api.state.StateBackend; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index a991fd3..17bd08d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.state.StateBackend; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java deleted file mode 100644 index b974674..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static java.util.Objects.requireNonNull; - -/** - * Base class for key/value state implementations that are backed by a regular heap hash map. The - * concrete implementations define how the state is checkpointed. - * - * @param <K> The type of the key. - * @param <V> The type of the value. - * @param <Backend> The type of the backend that snapshots this key/value state. - */ -public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Backend>> implements KvState<K, V, Backend> { - - /** Map containing the actual key/value pairs */ - private final HashMap<K, V> state; - - /** The serializer for the keys */ - private final TypeSerializer<K> keySerializer; - - /** The serializer for the values */ - private final TypeSerializer<V> valueSerializer; - - /** The value that is returned when no other value has been associated with a key, yet */ - private final V defaultValue; - - /** The current key, which the next value methods will refer to */ - private K currentKey; - - /** - * Creates a new empty key/value state. - * - * @param keySerializer The serializer for the keys. - * @param valueSerializer The serializer for the values. - * @param defaultValue The value that is returned when no other value has been associated with a key, yet. - */ - protected AbstractHeapKvState(TypeSerializer<K> keySerializer, - TypeSerializer<V> valueSerializer, - V defaultValue) { - this(keySerializer, valueSerializer, defaultValue, new HashMap<K, V>()); - } - - /** - * Creates a new key/value state for the given hash map of key/value pairs. - * - * @param keySerializer The serializer for the keys. - * @param valueSerializer The serializer for the values. - * @param defaultValue The value that is returned when no other value has been associated with a key, yet. - * @param state The state map to use in this kev/value state. May contain initial state. - */ - protected AbstractHeapKvState(TypeSerializer<K> keySerializer, - TypeSerializer<V> valueSerializer, - V defaultValue, - HashMap<K, V> state) { - this.state = requireNonNull(state); - this.keySerializer = requireNonNull(keySerializer); - this.valueSerializer = requireNonNull(valueSerializer); - this.defaultValue = defaultValue; - } - - // ------------------------------------------------------------------------ - - @Override - public V value() { - V value = state.get(currentKey); - return value != null ? value : defaultValue; - } - - @Override - public void update(V value) { - if (value != null) { - state.put(currentKey, value); - } - else { - state.remove(currentKey); - } - } - - @Override - public void setCurrentKey(K currentKey) { - this.currentKey = currentKey; - } - - @Override - public int size() { - return state.size(); - } - - @Override - public void dispose() { - state.clear(); - } - - /** - * Gets the serializer for the keys. - * @return The serializer for the keys. - */ - public TypeSerializer<K> getKeySerializer() { - return keySerializer; - } - - /** - * Gets the serializer for the values. - * @return The serializer for the values. - */ - public TypeSerializer<V> getValueSerializer() { - return valueSerializer; - } - - // ------------------------------------------------------------------------ - // checkpointing utilities - // ------------------------------------------------------------------------ - - protected void writeStateToOutputView(final DataOutputView out) throws IOException { - for (Map.Entry<K, V> entry : state.entrySet()) { - keySerializer.serialize(entry.getKey(), out); - valueSerializer.serialize(entry.getValue(), out); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java deleted file mode 100644 index 9c628f8..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvState.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state; - -import org.apache.flink.api.common.state.OperatorState; - -/** - * Key/Value state implementation for user-defined state. The state is backed by a state - * backend, which typically follows one of the following patterns: Either the state is stored - * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the - * state backend into some store (during checkpoints), or the key/value state is in fact backed - * by an external key/value store as the state backend, and checkpoints merely record the - * metadata of what is considered part of the checkpoint. - * - * @param <K> The type of the key. - * @param <V> The type of the value. - */ -public interface KvState<K, V, Backend extends StateBackend<Backend>> extends OperatorState<V> { - - /** - * Sets the current key, which will be used to retrieve values for the next calls to - * {@link #value()} and {@link #update(Object)}. - * - * @param key The key. - */ - void setCurrentKey(K key); - - /** - * Creates a snapshot of this state. - * - * @param checkpointId The ID of the checkpoint for which the snapshot should be created. - * @param timestamp The timestamp of the checkpoint. - * @return A snapshot handle for this key/value state. - * - * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system - * can react to failed snapshots. - */ - KvStateSnapshot<K, V, Backend> shapshot(long checkpointId, long timestamp) throws Exception; - - /** - * Gets the number of key/value pairs currently stored in the state. Note that is a key - * has been associated with "null", the key is removed from the state an will not - * be counted here. - * - * @return The number of key/value pairs currently stored in the state. - */ - int size(); - - /** - * Disposes the key/value state, releasing all occupied resources. - */ - void dispose(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java deleted file mode 100644 index 6aa7a1e..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KvStateSnapshot.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state; - -import org.apache.flink.api.common.typeutils.TypeSerializer; - -/** - * This class represents a snapshot of the {@link KvState}, taken for a checkpoint. Where exactly - * the snapshot stores the snapshot data (in this object, in an external data store, etc) depends - * on the actual implementation. This snapshot defines merely how to restore the state and - * how to discard the state. - * - * <p>One possible implementation is that this snapshot simply contains a copy of the key/value map. - * - * <p>Another possible implementation for this snapshot is that the key/value map is serialized into - * a file and this snapshot object contains a pointer to that file. - * - * @param <K> The type of the key - * @param <V> The type of the value - * @param <Backend> The type of the backend that can restore the state from this snapshot. - */ -public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> extends java.io.Serializable { - - /** - * Loads the key/value state back from this snapshot. - * - * - * @param stateBackend The state backend that created this snapshot and can restore the key/value state - * from this snapshot. - * @param keySerializer The serializer for the keys. - * @param valueSerializer The serializer for the values. - * @param defaultValue The value that is returned when no other value has been associated with a key, yet. - * @param classLoader The class loader for user-defined types. - * - * @return An instance of the key/value state loaded from this snapshot. - * - * @throws Exception Exceptions can occur during the state loading and are forwarded. - */ - KvState<K, V, Backend> restoreState( - Backend stateBackend, - TypeSerializer<K> keySerializer, - TypeSerializer<V> valueSerializer, - V defaultValue, - ClassLoader classLoader) throws Exception; - - - /** - * Discards the state snapshot, removing any resources occupied by it. - * - * @throws Exception Exceptions occurring during the state disposal should be forwarded. - */ - void discardState() throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java deleted file mode 100644 index 2bbb4e2..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.List; - -/** - * This class represents serialized checkpoint data for a collection of elements. - */ -public class SerializedCheckpointData implements java.io.Serializable { - - private static final long serialVersionUID = -8783744683896503488L; - - /** ID of the checkpoint for which the IDs are stored */ - private final long checkpointId; - - /** The serialized elements */ - private final byte[] serializedData; - - /** The number of elements in the checkpoint */ - private final int numIds; - - /** - * Creates a SerializedCheckpointData object for the given serialized data. - * - * @param checkpointId The checkpointId of the checkpoint. - * @param serializedData The serialized IDs in this checkpoint. - * @param numIds The number of IDs in the checkpoint. - */ - public SerializedCheckpointData(long checkpointId, byte[] serializedData, int numIds) { - this.checkpointId = checkpointId; - this.serializedData = serializedData; - this.numIds = numIds; - } - - /** - * Gets the checkpointId of the checkpoint. - * @return The checkpointId of the checkpoint. - */ - public long getCheckpointId() { - return checkpointId; - } - - /** - * Gets the binary data for the serialized elements. - * @return The binary data for the serialized elements. - */ - public byte[] getSerializedData() { - return serializedData; - } - - /** - * Gets the number of IDs in the checkpoint. - * @return The number of IDs in the checkpoint. - */ - public int getNumIds() { - return numIds; - } - - // ------------------------------------------------------------------------ - // Serialize to Checkpoint - // ------------------------------------------------------------------------ - - /** - * Converts a list of checkpoints with elements into an array of SerializedCheckpointData. - * - * @param checkpoints The checkpoints to be converted into IdsCheckpointData. - * @param serializer The serializer to serialize the IDs. - * @param <T> The type of the ID. - * @return An array of serializable SerializedCheckpointData, one per entry in the - * - * @throws IOException Thrown, if the serialization fails. - */ - public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints, - TypeSerializer<T> serializer) throws IOException { - return fromDeque(checkpoints, serializer, new DataOutputSerializer(128)); - } - - /** - * Converts a list of checkpoints into an array of SerializedCheckpointData. - * - * @param checkpoints The checkpoints to be converted into IdsCheckpointData. - * @param serializer The serializer to serialize the IDs. - * @param outputBuffer The reusable serialization buffer. - * @param <T> The type of the ID. - * @return An array of serializable SerializedCheckpointData, one per entry in the - * - * @throws IOException Thrown, if the serialization fails. - */ - public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints, - TypeSerializer<T> serializer, - DataOutputSerializer outputBuffer) throws IOException { - SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()]; - - int pos = 0; - for (Tuple2<Long, List<T>> checkpoint : checkpoints) { - outputBuffer.clear(); - List<T> checkpointIds = checkpoint.f1; - - for (T id : checkpointIds) { - serializer.serialize(id, outputBuffer); - } - - serializedCheckpoints[pos++] = new SerializedCheckpointData( - checkpoint.f0, outputBuffer.getCopyOfBuffer(), checkpointIds.size()); - } - - return serializedCheckpoints; - } - - // ------------------------------------------------------------------------ - // De-Serialize from Checkpoint - // ------------------------------------------------------------------------ - - /** - * De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints. - * - * @param data The data to be deserialized. - * @param serializer The serializer used to deserialize the data. - * @param <T> The type of the elements. - * @return An ArrayDeque of element checkpoints. - * - * @throws IOException Thrown, if the serialization fails. - */ - public static <T> ArrayDeque<Tuple2<Long, List<T>>> toDeque( - SerializedCheckpointData[] data, TypeSerializer<T> serializer) throws IOException - { - ArrayDeque<Tuple2<Long, List<T>>> deque = new ArrayDeque<>(data.length); - DataInputDeserializer deser = null; - - for (SerializedCheckpointData checkpoint : data) { - byte[] serializedData = checkpoint.getSerializedData(); - if (deser == null) { - deser = new DataInputDeserializer(serializedData, 0, serializedData.length); - } - else { - deser.setBuffer(serializedData, 0, serializedData.length); - } - - final List<T> ids = new ArrayList<>(checkpoint.getNumIds()); - final int numIds = checkpoint.getNumIds(); - - for (int i = 0; i < numIds; i++) { - ids.add(serializer.deserialize(deser)); - } - - deque.addLast(new Tuple2<Long, List<T>>(checkpoint.checkpointId, ids)); - } - - return deque; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java deleted file mode 100644 index f4391ad..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.StateHandle; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.Serializable; - -/** - * A state backend defines how state is stored and snapshotted during checkpoints. - * - * @param <Backend> The type of backend itself. This generic parameter is used to refer to the - * type of backend when creating state backed by this backend. - */ -public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable { - - private static final long serialVersionUID = 4620413814639220247L; - - // ------------------------------------------------------------------------ - // initialization and cleanup - // ------------------------------------------------------------------------ - - /** - * This method is called by the task upon deployment to initialize the state backend for - * data for a specific job. - * - * @param job The ID of the job for which the state backend instance checkpoints data. - * @throws Exception Overwritten versions of this method may throw exceptions, in which - * case the job that uses the state backend is considered failed during - * deployment. - */ - public abstract void initializeForJob(JobID job) throws Exception; - - /** - * Disposes all state associated with the current job. - * - * @throws Exception Exceptions may occur during disposal of the state and should be forwarded. - */ - public abstract void disposeAllStateForCurrentJob() throws Exception; - - /** - * Closes the state backend, releasing all internal resources, but does not delete any persistent - * checkpoint data. - * - * @throws Exception Exceptions can be forwarded and will be logged by the system - */ - public abstract void close() throws Exception; - - // ------------------------------------------------------------------------ - // key/value state - // ------------------------------------------------------------------------ - - /** - * Creates a key/value state backed by this state backend. - * - * @param keySerializer The serializer for the key. - * @param valueSerializer The serializer for the value. - * @param defaultValue The value that is returned when no other value has been associated with a key, yet. - * @param <K> The type of the key. - * @param <V> The type of the value. - * - * @return A new key/value state backed by this backend. - * - * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. - */ - public abstract <K, V> KvState<K, V, Backend> createKvState( - TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, - V defaultValue) throws Exception; - - - // ------------------------------------------------------------------------ - // storing state for a checkpoint - // ------------------------------------------------------------------------ - - /** - * Creates an output stream that writes into the state of the given checkpoint. When the stream - * is closes, it returns a state handle that can retrieve the state back. - * - * @param checkpointID The ID of the checkpoint. - * @param timestamp The timestamp of the checkpoint. - * @return An output stream that writes state for the given checkpoint. - * - * @throws Exception Exceptions may occur while creating the stream and should be forwarded. - */ - public abstract CheckpointStateOutputStream createCheckpointStateOutputStream( - long checkpointID, long timestamp) throws Exception; - - /** - * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint. - * When the stream is closes, it returns a state handle that can retrieve the state back. - * - * @param checkpointID The ID of the checkpoint. - * @param timestamp The timestamp of the checkpoint. - * @return An DataOutputView stream that writes state for the given checkpoint. - * - * @throws Exception Exceptions may occur while creating the stream and should be forwarded. - */ - public CheckpointStateOutputView createCheckpointStateOutputView( - long checkpointID, long timestamp) throws Exception { - return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp)); - } - - /** - * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back. - * - * @param state The state to be checkpointed. - * @param checkpointID The ID of the checkpoint. - * @param timestamp The timestamp of the checkpoint. - * @param <S> The type of the state. - * - * @return A state handle that can retrieve the checkpoined state. - * - * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded. - */ - public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable( - S state, long checkpointID, long timestamp) throws Exception; - - - // ------------------------------------------------------------------------ - // Checkpoint state output stream - // ------------------------------------------------------------------------ - - /** - * A dedicated output stream that produces a {@link StreamStateHandle} when closed. - */ - public static abstract class CheckpointStateOutputStream extends OutputStream { - - /** - * Closes the stream and gets a state handle that can create an input stream - * producing the data written to this stream. - * - * @return A state handle that can create an input stream producing the data written to this stream. - * @throws IOException Thrown, if the stream cannot be closed. - */ - public abstract StreamStateHandle closeAndGetHandle() throws IOException; - } - - /** - * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed. - */ - public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper { - - private final CheckpointStateOutputStream out; - - public CheckpointStateOutputView(CheckpointStateOutputStream out) { - super(out); - this.out = out; - } - - /** - * Closes the stream and gets a state handle that can create a DataInputView. - * producing the data written to this stream. - * - * @return A state handle that can create an input stream producing the data written to this stream. - * @throws IOException Thrown, if the stream cannot be closed. - */ - public StateHandle<DataInputView> closeAndGetHandle() throws IOException { - return new DataInputViewHandle(out.closeAndGetHandle()); - } - - @Override - public void close() throws IOException { - out.close(); - } - } - - /** - * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle. - */ - private static final class DataInputViewHandle implements StateHandle<DataInputView> { - - private static final long serialVersionUID = 2891559813513532079L; - - private final StreamStateHandle stream; - - private DataInputViewHandle(StreamStateHandle stream) { - this.stream = stream; - } - - @Override - public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception { - return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader)); - } - - @Override - public void discardState() throws Exception { - stream.discardState(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java deleted file mode 100644 index ad87eae..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackendFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state; - -import org.apache.flink.configuration.Configuration; - -/** - * A factory to create a specific state backend. The state backend creation gets a Configuration - * object that can be used to read further config values. - * - * @param <T> The type of the state backend created. - */ -public interface StateBackendFactory<T extends StateBackend<T>> { - - /** - * Creates the state backend, optionally using the given configuration. - * - * @param config The Flink configuration (loaded by the TaskManager). - * @return The created state backend. - * - * @throws Exception Exceptions during instantiation can be forwarded. - */ - StateBackend<T> createFromConfig(Configuration config) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java deleted file mode 100644 index 0fa5952..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamStateHandle.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state; - -import org.apache.flink.runtime.state.StateHandle; - -import java.io.InputStream; - -/** - * A state handle that produces an input stream when resolved. - */ -public interface StreamStateHandle extends StateHandle<InputStream> {} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java deleted file mode 100644 index c4a376e..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/AbstractFileState.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.filesystem; - -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; - -import java.io.IOException; - -/** - * Base class for state that is stored in a file. - */ -public abstract class AbstractFileState implements java.io.Serializable { - - private static final long serialVersionUID = 350284443258002355L; - - /** The path to the file in the filesystem, fully describing the file system */ - private final Path filePath; - - /** Cached file system handle */ - private transient FileSystem fs; - - /** - * Creates a new file state for the given file path. - * - * @param filePath The path to the file that stores the state. - */ - protected AbstractFileState(Path filePath) { - this.filePath = filePath; - } - - /** - * Gets the path where this handle's state is stored. - * @return The path where this handle's state is stored. - */ - public Path getFilePath() { - return filePath; - } - - /** - * Discard the state by deleting the file that stores the state. If the parent directory - * of the state is empty after deleting the state file, it is also deleted. - * - * @throws Exception Thrown, if the file deletion (not the directory deletion) fails. - */ - public void discardState() throws Exception { - getFileSystem().delete(filePath, false); - - // send a call to delete the directory containing the file. this will - // fail (and be ignored) when some files still exist - try { - getFileSystem().delete(filePath.getParent(), false); - } catch (IOException ignored) {} - } - - /** - * Gets the file system that stores the file state. - * @return The file system that stores the file state. - * @throws IOException Thrown if the file system cannot be accessed. - */ - protected FileSystem getFileSystem() throws IOException { - if (fs == null) { - fs = FileSystem.get(filePath.toUri()); - } - return fs; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java deleted file mode 100644 index 9bf5ec1..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileSerializableStateHandle.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.filesystem; - -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.util.InstantiationUtil; - -import java.io.ObjectInputStream; - -/** - * A state handle that points to state stored in a file via Java Serialization. - * - * @param <T> The type of state pointed to by the state handle. - */ -public class FileSerializableStateHandle<T> extends AbstractFileState implements StateHandle<T> { - - private static final long serialVersionUID = -657631394290213622L; - - /** - * Creates a new FileSerializableStateHandle pointing to state at the given file path. - * - * @param filePath The path to the file containing the checkpointed state. - */ - public FileSerializableStateHandle(Path filePath) { - super(filePath); - } - - @Override - @SuppressWarnings("unchecked") - public T getState(ClassLoader classLoader) throws Exception { - FSDataInputStream inStream = getFileSystem().open(getFilePath()); - ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader); - return (T) ois.readObject(); - } -}
