dmvk commented on a change in pull request #18237:
URL: https://github.com/apache/flink/pull/18237#discussion_r776626328



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
##########
@@ -62,4 +64,7 @@
     default String getTaskManagerBindAddress() {
         return getConfiguration().getString(TaskManagerOptions.BIND_HOST);
     }
+
+    /** Gets the temporary working directory of the TaskManager instance. */

Review comment:
       nit: missing @return 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
##########
@@ -18,7 +18,6 @@
 

Review comment:
       OT: It would great to eventually get rid of this class completely

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/FileSlotAllocationSnapshotPersistenceService.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * File based {@link SlotAllocationSnapshotPersistenceService} that persists 
the {@link
+ * SlotAllocationSnapshot} as local files.
+ */
+public class FileSlotAllocationSnapshotPersistenceService
+        implements SlotAllocationSnapshotPersistenceService {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(FileSlotAllocationSnapshotPersistenceService.class);
+
+    static final String SUFFIX = ".bin";
+    private final File slotAllocationSnapshotDirectory;
+
+    public FileSlotAllocationSnapshotPersistenceService(File 
slotAllocationSnapshotDirectory) {
+        this.slotAllocationSnapshotDirectory = slotAllocationSnapshotDirectory;
+
+        if (!slotAllocationSnapshotDirectory.exists()
+                && !slotAllocationSnapshotDirectory.mkdirs()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot create the slot allocation snapshot 
directory %s.",
+                            slotAllocationSnapshotDirectory));
+        }
+    }
+
+    @Override
+    public void persistAllocationSnapshot(SlotAllocationSnapshot 
slotAllocationSnapshot)
+            throws IOException {
+        // Let's try to write the slot allocations on file
+        final File slotAllocationSnapshotFile =
+                
slotAllocationFile(slotAllocationSnapshot.getSlotID().getSlotNumber());
+
+        try (ObjectOutputStream oos =
+                new ObjectOutputStream(new 
FileOutputStream(slotAllocationSnapshotFile))) {
+            oos.writeObject(slotAllocationSnapshot);
+
+            LOG.debug(
+                    "Successfully written allocation state metadata file {} 
for job {} and allocation {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    slotAllocationSnapshot.getJobId(),
+                    slotAllocationSnapshot.getAllocationId());
+        }
+    }
+
+    private File slotAllocationFile(int slotIndex) {
+        return new File(
+                slotAllocationSnapshotDirectory.getAbsolutePath(), 
slotIndexToFilename(slotIndex));
+    }
+
+    @Nonnull
+    private String slotIndexToFilename(int slotIndex) {

Review comment:
       ```suggestion
       private static String slotIndexToFilename(int slotIndex) {
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
##########
@@ -321,8 +321,7 @@ private void lazyInitializeForJob(
 
         // initialize the paths where the local RocksDB files should be stored
         if (localRocksDbDirectories == null) {
-            // initialize from the temp directories
-            initializedDbBasePaths = 
env.getIOManager().getSpillingDirectories();
+            initializedDbBasePaths = new File[] 
{env.getTaskManagerInfo().getTmpWorkingDirectory()};

Review comment:
       I'm not sure this is alright :/ We basically limit RocksDB to a single 
disk by this change.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/FileSlotAllocationSnapshotPersistenceService.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * File based {@link SlotAllocationSnapshotPersistenceService} that persists 
the {@link
+ * SlotAllocationSnapshot} as local files.
+ */
+public class FileSlotAllocationSnapshotPersistenceService
+        implements SlotAllocationSnapshotPersistenceService {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(FileSlotAllocationSnapshotPersistenceService.class);
+
+    static final String SUFFIX = ".bin";

Review comment:
       ```suggestion
       private static final String SUFFIX = ".bin";
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
##########
@@ -188,7 +189,8 @@ public static void main(String[] args) throws Exception {
                                 taskManagerConfig,
                                 
TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(
                                         copiedConf),
-                                
InetAddress.getLoopbackAddress().getHostAddress());
+                                
InetAddress.getLoopbackAddress().getHostAddress(),
+                                
Files.createTempDirectory("tmp_working_directory").toFile());

Review comment:
       do we need to clean this up explicitly in the test?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
##########
@@ -213,28 +218,30 @@ public void releaseLocalStateForAllocationId(@Nonnull 
AllocationID allocationID)
     }
 
     public void shutdown() {
-
-        HashMap<AllocationID, Map<JobVertexSubtaskKey, 
OwnedTaskLocalStateStore>> toRelease;
-
         synchronized (lock) {
             if (closed) {
                 return;
             }
 
             closed = true;
-            toRelease = new HashMap<>(taskStateStoresByAllocationID);
             taskStateStoresByAllocationID.clear();
         }
 
-        ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
-
         LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");
 
-        for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, 
OwnedTaskLocalStateStore>> entry :
-                toRelease.entrySet()) {
+        ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
 
-            doRelease(entry.getValue().values());
-            cleanupAllocationBaseDirs(entry.getKey());
+        if (localStateRootDirectories.isOwned()) {

Review comment:
       Should we use some kind of reference counting here instead? Are we 
guaranteed that all of the "borrowed" references are not using the directories 
anymore? What happens if they do?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/FileSlotAllocationSnapshotPersistenceService.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * File based {@link SlotAllocationSnapshotPersistenceService} that persists 
the {@link
+ * SlotAllocationSnapshot} as local files.
+ */
+public class FileSlotAllocationSnapshotPersistenceService
+        implements SlotAllocationSnapshotPersistenceService {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(FileSlotAllocationSnapshotPersistenceService.class);
+
+    static final String SUFFIX = ".bin";
+    private final File slotAllocationSnapshotDirectory;
+
+    public FileSlotAllocationSnapshotPersistenceService(File 
slotAllocationSnapshotDirectory) {
+        this.slotAllocationSnapshotDirectory = slotAllocationSnapshotDirectory;
+
+        if (!slotAllocationSnapshotDirectory.exists()
+                && !slotAllocationSnapshotDirectory.mkdirs()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot create the slot allocation snapshot 
directory %s.",
+                            slotAllocationSnapshotDirectory));
+        }
+    }
+
+    @Override
+    public void persistAllocationSnapshot(SlotAllocationSnapshot 
slotAllocationSnapshot)
+            throws IOException {
+        // Let's try to write the slot allocations on file
+        final File slotAllocationSnapshotFile =
+                
slotAllocationFile(slotAllocationSnapshot.getSlotID().getSlotNumber());
+
+        try (ObjectOutputStream oos =
+                new ObjectOutputStream(new 
FileOutputStream(slotAllocationSnapshotFile))) {
+            oos.writeObject(slotAllocationSnapshot);
+
+            LOG.debug(
+                    "Successfully written allocation state metadata file {} 
for job {} and allocation {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    slotAllocationSnapshot.getJobId(),
+                    slotAllocationSnapshot.getAllocationId());
+        }
+    }
+
+    private File slotAllocationFile(int slotIndex) {
+        return new File(
+                slotAllocationSnapshotDirectory.getAbsolutePath(), 
slotIndexToFilename(slotIndex));
+    }
+
+    @Nonnull
+    private String slotIndexToFilename(int slotIndex) {
+        return slotIndex + SUFFIX;
+    }
+
+    private int filenameToSlotIndex(String filename) {
+        return Integer.parseInt(filename.substring(0, filename.length() - 
SUFFIX.length()));
+    }
+
+    @Override
+    public void deleteAllocationSnapshot(int slotIndex) {
+        // Let's try to write the slot allocations on file
+        final File slotAllocationSnapshotFile = slotAllocationFile(slotIndex);
+        try {
+            FileUtils.deleteFileOrDirectory(slotAllocationSnapshotFile);
+            LOG.debug(
+                    "Successfully deleted allocation state metadata file {}.",
+                    slotAllocationSnapshotFile.toPath());
+        } catch (IOException ioe) {
+            LOG.debug(

Review comment:
       warning?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/FileSlotAllocationSnapshotPersistenceService.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * File based {@link SlotAllocationSnapshotPersistenceService} that persists 
the {@link
+ * SlotAllocationSnapshot} as local files.
+ */
+public class FileSlotAllocationSnapshotPersistenceService

Review comment:
       My biggest concern here is whether we could use a custom snapshot 
serializer instead of Java serialization. Something as simple as mapping the 
output to json would be fine.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/FileSlotAllocationSnapshotPersistenceService.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * File based {@link SlotAllocationSnapshotPersistenceService} that persists 
the {@link
+ * SlotAllocationSnapshot} as local files.
+ */
+public class FileSlotAllocationSnapshotPersistenceService
+        implements SlotAllocationSnapshotPersistenceService {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(FileSlotAllocationSnapshotPersistenceService.class);
+
+    static final String SUFFIX = ".bin";
+    private final File slotAllocationSnapshotDirectory;
+
+    public FileSlotAllocationSnapshotPersistenceService(File 
slotAllocationSnapshotDirectory) {
+        this.slotAllocationSnapshotDirectory = slotAllocationSnapshotDirectory;
+
+        if (!slotAllocationSnapshotDirectory.exists()
+                && !slotAllocationSnapshotDirectory.mkdirs()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot create the slot allocation snapshot 
directory %s.",
+                            slotAllocationSnapshotDirectory));
+        }
+    }
+
+    @Override
+    public void persistAllocationSnapshot(SlotAllocationSnapshot 
slotAllocationSnapshot)
+            throws IOException {
+        // Let's try to write the slot allocations on file
+        final File slotAllocationSnapshotFile =
+                
slotAllocationFile(slotAllocationSnapshot.getSlotID().getSlotNumber());
+
+        try (ObjectOutputStream oos =
+                new ObjectOutputStream(new 
FileOutputStream(slotAllocationSnapshotFile))) {
+            oos.writeObject(slotAllocationSnapshot);
+
+            LOG.debug(
+                    "Successfully written allocation state metadata file {} 
for job {} and allocation {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    slotAllocationSnapshot.getJobId(),
+                    slotAllocationSnapshot.getAllocationId());
+        }
+    }
+
+    private File slotAllocationFile(int slotIndex) {
+        return new File(
+                slotAllocationSnapshotDirectory.getAbsolutePath(), 
slotIndexToFilename(slotIndex));
+    }
+
+    @Nonnull

Review comment:
       nit: aren't we implicitly treating everything as nonnull?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/FileSlotAllocationSnapshotPersistenceService.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * File based {@link SlotAllocationSnapshotPersistenceService} that persists 
the {@link
+ * SlotAllocationSnapshot} as local files.
+ */
+public class FileSlotAllocationSnapshotPersistenceService
+        implements SlotAllocationSnapshotPersistenceService {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(FileSlotAllocationSnapshotPersistenceService.class);
+
+    static final String SUFFIX = ".bin";
+    private final File slotAllocationSnapshotDirectory;
+
+    public FileSlotAllocationSnapshotPersistenceService(File 
slotAllocationSnapshotDirectory) {
+        this.slotAllocationSnapshotDirectory = slotAllocationSnapshotDirectory;
+
+        if (!slotAllocationSnapshotDirectory.exists()
+                && !slotAllocationSnapshotDirectory.mkdirs()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot create the slot allocation snapshot 
directory %s.",
+                            slotAllocationSnapshotDirectory));
+        }
+    }
+
+    @Override
+    public void persistAllocationSnapshot(SlotAllocationSnapshot 
slotAllocationSnapshot)
+            throws IOException {
+        // Let's try to write the slot allocations on file
+        final File slotAllocationSnapshotFile =
+                
slotAllocationFile(slotAllocationSnapshot.getSlotID().getSlotNumber());
+
+        try (ObjectOutputStream oos =
+                new ObjectOutputStream(new 
FileOutputStream(slotAllocationSnapshotFile))) {
+            oos.writeObject(slotAllocationSnapshot);
+
+            LOG.debug(
+                    "Successfully written allocation state metadata file {} 
for job {} and allocation {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    slotAllocationSnapshot.getJobId(),
+                    slotAllocationSnapshot.getAllocationId());
+        }
+    }
+
+    private File slotAllocationFile(int slotIndex) {
+        return new File(
+                slotAllocationSnapshotDirectory.getAbsolutePath(), 
slotIndexToFilename(slotIndex));
+    }
+
+    @Nonnull
+    private String slotIndexToFilename(int slotIndex) {
+        return slotIndex + SUFFIX;
+    }
+
+    private int filenameToSlotIndex(String filename) {

Review comment:
       ```suggestion
       private static int filenameToSlotIndex(String filename) {
   ```

##########
File path: docs/layouts/shortcodes/generated/cluster_configuration.html
##########
@@ -80,5 +80,23 @@
             <td>Boolean</td>
             <td>Whether to convert all PIPELINE edges to BLOCKING when apply 
fine-grained resource management in batch jobs.</td>
         </tr>
+        <tr>
+            <td><h5>process.jobmanager.working-dir</h5></td>
+            <td style="word-wrap: break-word;">process.working-dir</td>
+            <td>String</td>
+            <td>Working directory for the JobManager process. The working 
directory can be used to store information that can be used upon process 
recovery. If the not configured, then it will default to <code 
class="highlighter-rouge">process.working-dir</code>.</td>
+        </tr>
+        <tr>
+            <td><h5>process.taskmanager.working-dir</h5></td>
+            <td style="word-wrap: break-word;">process.working-dir</td>
+            <td>String</td>
+            <td>Working directory for the TaskManager process. The working 
directory can be used to store information that can be used upon process 
recovery. If the not configured, then it will default to <code 
class="highlighter-rouge">process.working-dir</code>.</td>
+        </tr>
+        <tr>
+            <td><h5>process.working-dir</h5></td>
+            <td style="word-wrap: break-word;">io.tmp.dirs</td>
+            <td>String</td>
+            <td>Working directory for Flink processes. The working directory 
can be used to store information that can be used upon process recovery. If the 
not configured, then it will default to <code 
class="highlighter-rouge">io.tmp.dirs</code>.</td>

Review comment:
       This is mapping multiple directories into a single directory. Can you 
add an explanation on which directory would be picked in case of multiple io 
dirs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to