This is an automated email from the ASF dual-hosted git repository.
rabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 017d7cc36 KeyAlreadyExistsException now does not cause
downloadMissingBlob to return false (#8273)
017d7cc36 is described below
commit 017d7cc3669afefb3427ad7b31237cd9a14fa02a
Author: reiabreu <[email protected]>
AuthorDate: Thu Oct 9 09:59:12 2025 +0100
KeyAlreadyExistsException now does not cause downloadMissingBlob to return
false (#8273)
* KeyAlreadyExistsException now does not cause downloadMissingBlob to
return false
---
.../org/apache/storm/blobstore/BlobStoreUtils.java | 1 +
.../apache/storm/blobstore/BlobStoreUtilsTest.java | 51 +++++++++++++++++++++-
2 files changed, 50 insertions(+), 2 deletions(-)
diff --git
a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index cc1d54550..f42b0c7e0 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -150,6 +150,7 @@ public class BlobStoreUtils {
throw new RuntimeException(exception);
} catch (KeyAlreadyExistsException kae) {
LOG.info("KeyAlreadyExistsException Key: {} {}", key, kae);
+ isSuccess = true;
} catch (KeyNotFoundException knf) {
// Catching and logging KeyNotFoundException because, if
// there is a subsequent update and delete, the non-leader
diff --git
a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java
b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java
index fa4352691..24fb1b59b 100644
---
a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java
+++
b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java
@@ -12,17 +12,35 @@
package org.apache.storm.blobstore;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+
+import org.apache.storm.generated.*;
import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.thrift.TException;
+import org.apache.storm.utils.NimbusClient;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import javax.security.auth.Subject;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.atLeastOnce;
+
+
public class BlobStoreUtilsTest {
@@ -126,4 +144,33 @@ public class BlobStoreUtilsTest {
verify(nimbusDetails).getHost();
verify(conf, atLeastOnce()).get(anyString());
}
+
+ @Test
+ public void testDownloadMissingBlob_KeyAkreadyExists() throws TException,
IOException {
+
+ NimbusClient.Builder builder1 = mock(NimbusClient.Builder.class);
+ NimbusClient.Builder builder2 = mock(NimbusClient.Builder.class);
+ NimbusClient client = mock(NimbusClient.class);
+ ReadableBlobMeta readableBlobMeta = mock(ReadableBlobMeta.class);
+ Nimbus.Iface iface = mock(Nimbus.Iface.class);
+
+
+ try (MockedStatic<NimbusClient.Builder> mockedNimbusClient =
Mockito.mockStatic(NimbusClient.Builder.class)) {
+
+ mockedNimbusClient.when(()
->NimbusClient.Builder.withConf(anyMap())).thenReturn(builder1);
+ when(builder1.forDaemon()).thenReturn(builder2);
+
when(builder2.buildWithNimbusHostPort(anyString(),anyInt())).thenReturn(client);
+ when(client.getClient()).thenReturn(iface);
+ when(iface.getBlobMeta(anyString())).thenReturn(readableBlobMeta);
+ when(readableBlobMeta.get_settable()).thenReturn(new
SettableBlobMeta());
+ when(iface.beginBlobDownload(anyString())).thenReturn(new
BeginDownloadResult());
+ when(nimbusDetails.getHost()).thenReturn("localhost");
+ when(nimbusDetails.getPort()).thenReturn(1234);
+
+ doThrow(new
KeyAlreadyExistsException()).when(blobStore).createBlob(anyString(),any(InputStream.class),any(SettableBlobMeta.class),any(Subject.class));
+
+ assertTrue((BlobStoreUtils.downloadMissingBlob(conf, blobStore,
"testKey", Collections.singleton(nimbusDetails))));
+ }
+
+ }
}