zhuzhurk commented on a change in pull request #16538:
URL: https://github.com/apache/flink/pull/16538#discussion_r672971499
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
##########
@@ -778,6 +778,50 @@ boolean deleteInternal(@Nullable JobID jobId,
TransientBlobKey key) {
}
}
+ /**
+ * Deletes the file associated with the blob key in the local storage of
the blob server.
+ *
+ * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if
job-unrelated)
+ * @param key blob key associated with the file to be deleted
+ * @return <tt>true</tt> if the given blob is successfully deleted or
non-existing;
+ * <tt>false</tt> otherwise
+ */
+ private boolean deleteInternal(JobID jobId, PermanentBlobKey key) {
+ final File localFile =
+ new File(
+
BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
+
+ readWriteLock.writeLock().lock();
+
+ try {
+ boolean deleteLocally = true;
+ if (!localFile.delete() && localFile.exists()) {
Review comment:
Will an exception be thrown if the file was already deleted by another
thread?
Should we check the existence of the file before try deleting it?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
##########
@@ -251,6 +251,104 @@ private void testDeleteTransientFails(@Nullable final
JobID jobId) throws IOExce
}
}
+ @Test
+ public void testDeletePermanent() throws IOException {
Review comment:
Is it possible to reuse the code of `testDeleteTransient`,
`testDeleteTransientAlreadyDeleted` and `testDeleteTransientFails`?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobWriterTest.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link BlobWriter}. */
+public class BlobWriterTest extends TestLogger {
+
+ @Test
+ public void testSerializeAndTryOffload() throws IOException,
ClassNotFoundException {
Review comment:
I think it does not make much sense to add a test case which verifies a
test class. While the test class is only used by its own unit test.
I think the newly added tests in `BlobServerDeleteTest` are enough.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
##########
@@ -103,4 +104,6 @@ void notifySchedulerNgAboutInternalTaskFailure(
ExecutionVertex getExecutionVertexOrThrow(ExecutionVertexID id);
IntermediateResultPartition getResultPartitionOrThrow(final
IntermediateResultPartitionID id);
+
+ void notifyBlobWriterDeleteCache(List<PermanentBlobKey> blobKeys);
Review comment:
Maybe name it as `deleteBlobCache(...)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]