zentol commented on a change in pull request #18083:
URL: https://github.com/apache/flink/pull/18083#discussion_r783911518
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -547,6 +547,17 @@
code(SchedulerType.AdaptiveBatch.name()))
.build());
+ /**
+ * The JobManager's ResourceID. If not configured, the ResourceID will be
generated randomly.
+ */
+ @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
+ public static final ConfigOption<String> JOB_MANAGER_RESOURCE_ID =
+ key("jobmanager.resource-id")
Review comment:
I'm wondering if we should expose the concept of resource IDs here.
Maybe we could do something more generic like "process id" or a plain
"jobmanager id" or something. Ideally whatever we expose here eventually also
finds its way into the metric system.
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -157,6 +158,47 @@
UncaughtExceptionHandleMode.LOG.name(),
UncaughtExceptionHandleMode.FAIL.name()));
+ @Documentation.OverrideDefault("io.tmp.dirs")
+ @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+ public static final ConfigOption<String> PROCESS_WORKING_DIR_BASE =
+ ConfigOptions.key("process.working-dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Working directory for Flink
processes. "
+ + "The working directory
can be used to store information that can be used upon process recovery. "
+ + "If the not configured,
then it will default to the first temporary directory defined via %s.",
Review comment:
```suggestion
+ "If not configured,
then it will default to the first temporary directory defined via %s.",
```
Same issue applies to other options.
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -157,6 +158,47 @@
UncaughtExceptionHandleMode.LOG.name(),
UncaughtExceptionHandleMode.FAIL.name()));
+ @Documentation.OverrideDefault("io.tmp.dirs")
+ @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+ public static final ConfigOption<String> PROCESS_WORKING_DIR_BASE =
+ ConfigOptions.key("process.working-dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Working directory for Flink
processes. "
+ + "The working directory
can be used to store information that can be used upon process recovery. "
+ + "If the not configured,
then it will default to the first temporary directory defined via %s.",
+
TextElement.code(CoreOptions.TMP_DIRS.key()))
+ .build());
+
+ @Documentation.OverrideDefault("process.working-dir")
+ @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+ public static final ConfigOption<String>
JOB_MANAGER_PROCESS_WORKING_DIR_BASE =
+ ConfigOptions.key("process.jobmanager.working-dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Working directory for the
JobManager process. The working directory can be used to store information that
can be used upon process recovery. If the not configured, then it will default
to %s.",
Review comment:
```suggestion
"Working directory for Flink
JobManager processes. The working directory can be used to store information
that can be used upon process recovery. If the not configured, then it will
default to %s.",
```
Just for consistency with the generic option.
##########
File path: docs/layouts/shortcodes/generated/all_taskmanager_section.html
##########
@@ -8,12 +8,6 @@
</tr>
</thead>
<tbody>
- <tr>
- <td><h5>jobmanager.resource-id</h5></td>
Review comment:
nit: wrong commit
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
##########
@@ -73,6 +74,25 @@
return splitPaths(configuration.getString(CoreOptions.TMP_DIRS));
}
+ /**
+ * Extracts the first temporary directory from the given configuration.
+ *
+ * @param configuration to extract the temp directory from
+ * @return the first temporary directory
+ */
+ @Nonnull
+ public static File getFirstTempDirectory(Configuration configuration) {
+ final String[] tmpDirectories =
splitPaths(configuration.getString(CoreOptions.TMP_DIRS));
Review comment:
```suggestion
final String[] tmpDirectories = parseTempDirectories(configuration);
```
##########
File path: docs/layouts/shortcodes/generated/all_taskmanager_section.html
##########
@@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>jobmanager.resource-id</h5></td>
Review comment:
Why does it show up in this file?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/WorkingDirectory.java
##########
@@ -29,20 +29,32 @@
*/
public class WorkingDirectory {
private final File root;
+ private final File tmp;
private WorkingDirectory(File root) throws IOException {
this.root = root;
+ createDirectory(root);
- if (!root.mkdirs() && !root.exists()) {
+ this.tmp = new File(root, "tmp");
+ createDirectory(tmp);
+ FileUtils.cleanDirectory(tmp);
Review comment:
Are there any safeguards that the WorkingDirectory is not shared across
(active) processes?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -558,9 +587,29 @@ private Configuration
generateClusterConfiguration(Configuration configuration)
* @throws IOException if the temporary directories could not be cleaned up
*/
protected void cleanupDirectories() throws IOException {
+ IOException ioException = null;
+
final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);
Review comment:
The fact that this directory is also not being cleaned up seems
incorrect; shouldn't that behavior be limited to the working directory? Either
this directory should be part of the working directory, or it should always be
cleaned up.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/WorkingDirectory.java
##########
@@ -38,6 +39,9 @@ private WorkingDirectory(File root) throws IOException {
this.tmp = new File(root, "tmp");
createDirectory(tmp);
FileUtils.cleanDirectory(tmp);
+
+ localState = new File(root, "localState");
Review comment:
Wondering if this should be somehow connected to
`TaskManagerServices#LOCAL_STATE_SUB_DIRECTORY_ROOT`.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
##########
@@ -40,7 +43,14 @@
.noDefaultValue()
.withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
.withDescription(
- "The local directory (on the TaskManager) where
RocksDB puts its files.");
+ Description.builder()
+ .text(
+ "The local directory (on the
TaskManager) where RocksDB puts its files. Per default, it will be
<WORKING_DIR>/tmp. See %s for more details.",
Review comment:
if `<workingDir>/tmp` is always cleaned when the process starts, then
what do we gain by putting rocksdb files in there? I don't quite understand the
justification in the FLIP.
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
##########
@@ -164,9 +164,14 @@
ConfigOptions.key("taskmanager.state.local.root-dirs")
.noDefaultValue()
.withDescription(
- "The config parameter defining the root
directories for storing file-based state for local "
- + "recovery. Local recovery currently only
covers keyed state backends. Currently, MemoryStateBackend does "
- + "not support local recovery and ignore
this option");
+ Description.builder()
Review comment:
Should this option be deprecated?
##########
File path: docs/layouts/shortcodes/generated/all_jobmanager_section.html
##########
@@ -56,6 +56,12 @@
<td>Integer</td>
<td>The size of the IO thread pool to run blocking operations for
all spawned JobMasters. This includes recovery and completion of checkpoints.
Increase this value if you experience slow checkpoint operations when running
many jobs. If no value is specified, then Flink defaults to the number of
available CPU cores.</td>
</tr>
+ <tr>
+ <td><h5>jobmanager.resource-id</h5></td>
Review comment:
nit: wrong commit
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -157,6 +158,47 @@
UncaughtExceptionHandleMode.LOG.name(),
UncaughtExceptionHandleMode.FAIL.name()));
+ @Documentation.OverrideDefault("io.tmp.dirs")
+ @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+ public static final ConfigOption<String> PROCESS_WORKING_DIR_BASE =
+ ConfigOptions.key("process.working-dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Working directory for Flink
processes. "
+ + "The working directory
can be used to store information that can be used upon process recovery. "
+ + "If the not configured,
then it will default to the first temporary directory defined via %s.",
+
TextElement.code(CoreOptions.TMP_DIRS.key()))
+ .build());
+
+ @Documentation.OverrideDefault("process.working-dir")
Review comment:
Why is this not a fallbackKey?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -507,9 +530,15 @@ private Configuration
generateClusterConfiguration(Configuration configuration)
final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture =
FutureUtils.runAfterwards(serviceShutdownFuture,
rpcSystem::close);
- final CompletableFuture<Void> cleanupDirectoriesFuture =
- FutureUtils.runAfterwards(
- rpcSystemClassLoaderCloseFuture,
this::cleanupDirectories);
+ final CompletableFuture<Void> cleanupDirectoriesFuture;
+
+ if (shutdownBehaviour == ShutdownBehaviour.STOP_APPLICATION) {
Review comment:
This needs a comment to explain why we only do it for STOP_APPLICATION.
it's not obvious what the difference between application & process are in this
context.
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -157,6 +158,47 @@
UncaughtExceptionHandleMode.LOG.name(),
UncaughtExceptionHandleMode.FAIL.name()));
+ @Documentation.OverrideDefault("io.tmp.dirs")
+ @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+ public static final ConfigOption<String> PROCESS_WORKING_DIR_BASE =
+ ConfigOptions.key("process.working-dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Working directory for Flink
processes. "
+ + "The working directory
can be used to store information that can be used upon process recovery. "
+ + "If the not configured,
then it will default to the first temporary directory defined via %s.",
+
TextElement.code(CoreOptions.TMP_DIRS.key()))
+ .build());
+
+ @Documentation.OverrideDefault("process.working-dir")
+ @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+ public static final ConfigOption<String>
JOB_MANAGER_PROCESS_WORKING_DIR_BASE =
+ ConfigOptions.key("process.jobmanager.working-dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Working directory for the
JobManager process. The working directory can be used to store information that
can be used upon process recovery. If the not configured, then it will default
to %s.",
+
TextElement.code(PROCESS_WORKING_DIR_BASE.key()))
+ .build());
+
+ @Documentation.OverrideDefault("process.working-dir")
+ @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+ public static final ConfigOption<String>
TASK_MANAGER_PROCESS_WORKING_DIR_BASE =
+ ConfigOptions.key("process.taskmanager.working-dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Working directory for the
TaskManager process. The working directory can be used to store information
that can be used upon process recovery. If the not configured, then it will
default to %s.",
Review comment:
```suggestion
"Working directory for Flink
TaskManager processes. The working directory can be used to store information
that can be used upon process recovery. If the not configured, then it will
default to %s.",
```
Just for consistency with the generic option.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/ClusterEntrypointITCase.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DispatcherProcess;
+import org.apache.flink.test.util.TestProcessBuilder;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+
+import static org.junit.Assert.assertTrue;
+
+/** Integration tests for the {@link
org.apache.flink.runtime.entrypoint.ClusterEntrypoint}. */
+public class ClusterEntrypointITCase extends TestLogger {
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Test
+ public void testWorkingDirectoryIsNotDeletedInCaseOfProcessFailure()
throws Exception {
+ final File workingDirBase = TEMPORARY_FOLDER.newFolder();
+ final ResourceID resourceId = ResourceID.generate();
+
+ final Configuration configuration = new Configuration();
+ configuration.set(
+ ClusterOptions.PROCESS_WORKING_DIR_BASE,
workingDirBase.getAbsolutePath());
+ configuration.set(JobManagerOptions.JOB_MANAGER_RESOURCE_ID,
resourceId.toString());
+
+ final File workingDirectory =
+ ClusterEntrypointUtils.generateJobManagerWorkingDirectoryFile(
+ configuration, resourceId);
+
+ final TestProcessBuilder.TestProcess taskManagerProcess =
Review comment:
this is not a taskManagerProcess
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointUtils.java
##########
@@ -128,4 +134,106 @@ public static void
configureUncaughtExceptionHandler(Configuration config) {
new ClusterUncaughtExceptionHandler(
config.get(ClusterOptions.UNCAUGHT_EXCEPTION_HANDLING)));
}
+
+ /**
+ * Creates the working directory for the TaskManager process. This method
ensures that the
+ * working directory exists.
+ *
+ * @param configuration to extract the required settings from
+ * @param resourceId identifying the TaskManager process
+ * @return working directory
+ * @throws IOException if the working directory could not be created
+ */
+ public static WorkingDirectory createTaskManagerWorkingDirectory(
+ Configuration configuration, ResourceID resourceId) throws
IOException {
+ return WorkingDirectory.create(
+ generateTaskManagerWorkingDirectoryFile(configuration,
resourceId));
+ }
+
+ /**
+ * Generates the working directory {@link File} for the TaskManager
process. This method does
+ * not ensure that the working directory exists.
+ *
+ * @param configuration to extract the required settings from
+ * @param resourceId identifying the TaskManager process
+ * @return working directory file
+ */
+ @VisibleForTesting
+ public static File generateTaskManagerWorkingDirectoryFile(
+ Configuration configuration, ResourceID resourceId) {
+ return generateWorkingDirectoryFile(
+ configuration,
+
Optional.of(ClusterOptions.TASK_MANAGER_PROCESS_WORKING_DIR_BASE),
+ "tm_" + resourceId);
+ }
+
+ /**
+ * Generates the working directory {@link File} for the JobManager
process. This method does not
+ * ensure that the working directory exists.
+ *
+ * @param configuration to extract the required settings from
+ * @param resourceId identifying the JobManager process
+ * @return working directory file
+ */
+ @VisibleForTesting
+ public static File generateJobManagerWorkingDirectoryFile(
+ Configuration configuration, ResourceID resourceId) {
+ return generateWorkingDirectoryFile(
+ configuration,
+
Optional.of(ClusterOptions.JOB_MANAGER_PROCESS_WORKING_DIR_BASE),
+ "jm_" + resourceId);
+ }
+
+ /**
+ * Generate the working directory from the given configuration. If a
preceding option is
+ * specified, then this config option will be read first for the working
directory. Next {@link
+ * ClusterOptions#PROCESS_WORKING_DIR_BASE} will be tried. At last, {@link
CoreOptions#TMP_DIRS}
+ * will be used to extract the working directory base from.
+ *
+ * @param configuration to extract the working directory from
+ * @param precedingOption optional preceding option
+ * @param workingDirectoryName name of the working directory to create
+ * @return working directory
+ */
+ public static File generateWorkingDirectoryFile(
+ Configuration configuration,
+ Optional<ConfigOption<String>> precedingOption,
+ String workingDirectoryName) {
+ final Optional<String> optionalWorkingDirectory =
+ getOptionalWorkingDirectory(configuration, precedingOption);
+
+ final File workingDirectoryBase =
+ optionalWorkingDirectory
+ .map(File::new)
+ .orElseGet(() ->
ConfigurationUtils.getFirstTempDirectory(configuration));
+
+ return new File(workingDirectoryBase, workingDirectoryName);
+ }
+
+ private static Optional<String> getOptionalWorkingDirectory(
+ Configuration configuration, Optional<ConfigOption<String>>
precedingOption) {
Review comment:
This seems overly complicated. If the JM/TM options had a fallback key
for PROCESS_WORKING_DIR_BASE this could be simpler.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/WorkingDirectoryTest.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
Review comment:
new tests should use junit5
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
##########
@@ -95,12 +94,14 @@
@Test
public void testExitJvmOnOutOfMemory() throws Exception {
// this test works only on linux
- assumeTrue(OperatingSystem.isLinux());
+ // assumeTrue(OperatingSystem.isLinux());
Review comment:
maybe replace this with `assumeFalse(OperatingSystem.isWindows());`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]