http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index 9b33c6a..c586db3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -28,8 +28,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.api.state.StateBackend; -import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask;
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 29bf938..e953696 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -26,7 +26,7 @@ import org.apache.flink.api.common.io.{FileInputFormat, InputFormat} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.scala.ClosureCleaner -import org.apache.flink.streaming.api.state.StateBackend +import org.apache.flink.runtime.state.StateBackend import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode} import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index 989db14..3b5295f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -23,7 +23,7 @@ import java.io.File; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import org.apache.flink.test.testdata.KMeansData; import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java index 2cdf83c..f15644e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; import org.apache.flink.runtime.testutils.JobManagerProcess; @@ -542,7 +543,7 @@ public class ChaosMonkeyITCase { LOG.info("Checking file system backend state..."); - File fsCheckpoints = new File(config.getString(ConfigConstants.STATE_BACKEND_FS_DIR, "")); + File fsCheckpoints = new File(config.getString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, "")); LOG.info("Checking " + fsCheckpoints); http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java index 54ddf7e..e590067 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java @@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -68,7 +69,7 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -public class JobManagerCheckpointRecoveryITCase { +public class JobManagerCheckpointRecoveryITCase extends TestLogger { private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); @@ -144,7 +145,7 @@ public class JobManagerCheckpointRecoveryITCase { JobGraph jobGraph = env.getStreamGraph().getJobGraph(); Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper - .getConnectString(), FileStateBackendBasePath.getPath()); + .getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString()); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism); ActorSystem testSystem = null; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java index aa634f0..63aa967 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 5c7a932..5840a98 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -95,7 +95,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getPath()); + configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration); @@ -144,7 +144,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getPath()); + configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); // we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make // sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-tests/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties index 3ba6d1d..62807b3 100644 --- a/flink-tests/src/test/resources/log4j-test.properties +++ b/flink-tests/src/test/resources/log4j-test.properties @@ -19,7 +19,6 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes log4j.rootLogger=INFO, testlogger -log4j.logger.org.apache.flink.runtime.client.JobClientActor=DEBUG # A1 is set to be a ConsoleAppender. log4j.appender.testlogger=org.apache.log4j.ConsoleAppender http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index a05621a..ffb43f8 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; @@ -114,7 +115,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" + zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + - "@@" + ConfigConstants.STATE_BACKEND_FS_DIR + "=" + fsStateHandlePath + "/checkpoints" + + "@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" + "@@" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery"); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala index 84198ec..f3892dd 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala @@ -123,7 +123,7 @@ abstract class ApplicationMasterBase { config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0. } - val (actorSystem, jmActor, archivActor, webMonitor) = + val (actorSystem, jmActor, archiveActor, webMonitor) = JobManager.startActorSystemAndJobManagerActors( config, JobManagerMode.CLUSTER, http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 47c47c5..efe2101 100644 --- a/pom.xml +++ b/pom.xml @@ -735,7 +735,7 @@ under the License. <exclude>flink-runtime-web/web-dashboard/assets/fonts/fontawesome*</exclude> <!-- generated contents --> - <exclude>flink-runtime-web/src/main/resources/web/**</exclude> + <exclude>flink-runtime-web/web-dashboard/web/**</exclude> <!-- downloaded and generated web libraries. --> <exclude>flink-runtime-web/web-dashboard/node_modules/**</exclude>
