sjwiesman commented on a change in pull request #13309: URL: https://github.com/apache/flink/pull/13309#discussion_r483299944
########## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyMapFunction.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; Review comment: This package is for user interfaces. This is an internal class and should be in the `output` package. Same with the other file. ########## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/StatePathExtractor.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.util.Collector; + +/** + * Extracts all file paths that are part of the provided {@link OperatorState}. + */ +public class StatePathExtractor implements FlatMapFunction<OperatorState, String> { Review comment: Mark as @Internal ########## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyMapFunction.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * This mapper copies files from an existing savepoint into a new directory. + */ +public final class FileCopyMapFunction implements MapFunction<String, String> { Review comment: Internal class with public scope should be marked @Internal ########## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/FileCopyMapFunction.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * This mapper copies files from an existing savepoint into a new directory. + */ +public final class FileCopyMapFunction implements MapFunction<String, String> { + + private static final long serialVersionUID = 1L; + + // the destination path to copy file + private final String path; + + public FileCopyMapFunction(String path) { + this.path = Preconditions.checkNotNull(path, "The destination path cannot be null"); + } + + @Override + public String map(String sourceFile) throws Exception { + // Create the destination directory before copying. It is not a problem if it exists already. + Path destPath = new Path(path); + destPath.getFileSystem().mkdirs(destPath); + + Files.copy( + Paths.get(sourceFile), // source file + Paths.get(path, Paths.get(sourceFile).getFileName().toString()) // destination file + ); + return sourceFile; Review comment: This doesn't work, you're using java filesystem when you need to be using Flink's filesystem. ########## File path: flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction; +import org.apache.flink.state.api.functions.KeyedStateReaderFunction; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.Collector; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.isIn; +import static org.junit.Assert.assertThat; + +/** + * Test the savepoint deep copy. + */ +@RunWith(value = Parameterized.class) +public class SavepointDeepCopyTest extends AbstractTestBase { + + private static final String TEXT = "The quick brown fox jumps over the lazy dog"; + private static final String RANDOM_VALUE = RandomStringUtils.randomAlphanumeric(120); + + private final StateBackend backend; + + public SavepointDeepCopyTest(StateBackend backend) throws Exception { + this.backend = backend; + //reset the cluster so we can change the state backend + miniClusterResource.after(); + miniClusterResource.before(); + } + + @Parameterized.Parameters(name = "State Backend: {0}") + public static Collection<StateBackend> data() { + // set the threshold to 1024 bytes to allow generate additional state data files with a small state + int fileStateSizeThreshold = 1024; Review comment: constants should be static final ########## File path: flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction; +import org.apache.flink.state.api.functions.KeyedStateReaderFunction; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.Collector; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.isIn; +import static org.junit.Assert.assertThat; + +/** + * Test the savepoint deep copy. + */ +@RunWith(value = Parameterized.class) +public class SavepointDeepCopyTest extends AbstractTestBase { + + private static final String TEXT = "The quick brown fox jumps over the lazy dog"; + private static final String RANDOM_VALUE = RandomStringUtils.randomAlphanumeric(120); + + private final StateBackend backend; + + public SavepointDeepCopyTest(StateBackend backend) throws Exception { + this.backend = backend; + //reset the cluster so we can change the state backend + miniClusterResource.after(); + miniClusterResource.before(); + } + + @Parameterized.Parameters(name = "State Backend: {0}") + public static Collection<StateBackend> data() { + // set the threshold to 1024 bytes to allow generate additional state data files with a small state + int fileStateSizeThreshold = 1024; + return Arrays.asList( + new FsStateBackend(new Path("file:///tmp").toUri(), fileStateSizeThreshold), + new RocksDBStateBackend( + (StateBackend) new FsStateBackend(new Path("file:///tmp").toUri(), fileStateSizeThreshold) + ) + ); + } + + /** + * To bootstrapper a savepoint for testing. + */ + static class WordMapBootstrapper extends KeyedStateBootstrapFunction<String, String> { + private ValueState<Tuple2<String, String>> state; + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor<Tuple2<String, String>> descriptor = new ValueStateDescriptor<>( + "state", Types.TUPLE(Types.STRING, Types.STRING)); + state = getRuntimeContext().getState(descriptor); + } + + @Override + public void processElement(String value, Context ctx) throws Exception { + if (state.value() == null) { + state.update(new Tuple2<>(value, RANDOM_VALUE)); + } + } + } + + /** + * To read the state back from the newly created savepoint. + */ + static class ReadFunction extends KeyedStateReaderFunction<String, Tuple2<String, String>> { + + private ValueState<Tuple2<String, String>> state; + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor<Tuple2<String, String>> stateDescriptor = new ValueStateDescriptor<>( + "state", Types.TUPLE(Types.STRING, Types.STRING)); + state = getRuntimeContext().getState(stateDescriptor); + } + + @Override + public void readKey( + String key, + Context ctx, + Collector<Tuple2<String, String>> out) throws Exception { + out.collect(state.value()); + } + } + + /** + * Test savepoint deep copy. This method tests the savepoint deep copy by: + * <ul> + * <li>create {@code savepoint1} with operator {@code Operator1}, make sure it has more state files in addition to + * _metadata + * <li>create {@code savepoint2} from {@code savepoint1} by adding a new operator {@code Operator2} + * <li>check all state files in {@code savepoint1}'s directory are copied over to {@code savepoint2}'s directory + * <li>read the state of {@code Operator1} from {@code savepoint2} and make sure the number of the keys remain same + * </ul> + * @throws Exception throw exceptions when anything goes wrong + */ + + @Test + public void testSavepointDeepCopy() throws Exception { + + // set up the execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // construct DataSet + DataSet<String> words = env.fromElements(TEXT.split(" ")); + + // create BootstrapTransformation + BootstrapTransformation<String> transformation = OperatorTransformation + .bootstrapWith(words) + .keyBy(e -> e) + .transform(new WordMapBootstrapper()); + + File savepointUrl1 = createAndRegisterTempFile(new AbstractID().toHexString()); + String savepointPath1 = savepointUrl1.getPath(); + + // create a savepoint with BootstrapTransformations (one per operator) + // write the created savepoint to a given path + Savepoint.create(backend, 128) + .withOperator("Operator1", transformation) + .write(savepointPath1); + + env.execute("bootstrap savepoint1"); + + Assert.assertTrue( + "Failed to bootstrap savepoint1 with additional state files", + Files.list(Paths.get(savepointPath1)).count() > 1 + ); + + Set<String> stateFiles1 = Files.list(Paths.get(savepointPath1)) + .map(path -> path.getFileName().toString()) + .collect(Collectors.toSet()); + + // create savepoint2 from savepoint1 created above + File savepointUrl2 = createAndRegisterTempFile(new AbstractID().toHexString()); + String savepointPath2 = savepointUrl2.getPath(); + + ExistingSavepoint savepoint2 = Savepoint.load(env, savepointPath1, backend); + savepoint2 + .withOperator("Operator2", transformation) + .write(savepointPath2); + env.execute("create savepoint2"); + + Assert.assertTrue("Failed to create savepoint2 from savepoint1 with additional state files", + Files.list(Paths.get(savepointPath2)).count() > 1 + ); + + Set<String> stateFiles2 = Files.list(Paths.get(savepointPath2)) + .map(path -> path.getFileName().toString()) + .collect(Collectors.toSet()); + + // check every state file in savepoint1 are also in savepoint2 Review comment: If you're adding a comment to an Assert then it should be in the error message. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
