This is an automated email from the ASF dual-hosted git repository.
ajothomas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0cf9f9a8d Fix get future exception message (#1712)
0cf9f9a8d is described below
commit 0cf9f9a8d44544170919464357310bc2d41a2e11
Author: ajo thomas <[email protected]>
AuthorDate: Tue Oct 15 14:39:43 2024 -0700
Fix get future exception message (#1712)
---
.../storage/blobstore/util/BlobStoreUtil.java | 2 +-
.../storage/blobstore/util/TestBlobStoreUtil.java | 74 ++++++++++++++++++++++
2 files changed, 75 insertions(+), 1 deletion(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
index bd78248cb..3735d6529 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
@@ -185,7 +185,7 @@ public class BlobStoreUtil {
.thenApplyAsync(f ->
snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor)
.handle((snapshotIndex, ex) -> {
if (ex != null) {
- throw new SamzaException(String.format("Unable to deserialize
SnapshotIndex bytes for blob ID: %s", blobId), ex);
+ throw new SamzaException(String.format("Unable to get
SnapshotIndex blob. The blob ID is : %s", blobId), ex);
}
return snapshotIndex;
});
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
index a44f86e64..732fd472b 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
@@ -28,6 +28,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -50,6 +51,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.zip.CRC32;
@@ -78,6 +80,7 @@ import org.apache.samza.storage.blobstore.index.SnapshotIndex;
import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
import org.apache.samza.util.FileUtil;
import org.apache.samza.util.FutureUtil;
+import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -920,6 +923,24 @@ public class TestBlobStoreUtil {
checkpoint, storesToBackupOrRestore, false);
}
+ @Test
+ public void testSerdeException() throws ExecutionException,
InterruptedException {
+ final String blobId = "foo";
+
+ final BlobStoreManager testBlobStoreManager = new
DeserTestBlobStoreManager();
+ final BlobStoreUtil util = new BlobStoreUtil(testBlobStoreManager,
Executors.newSingleThreadExecutor(), blobStoreConfig, null, null);
+
+ final CompletableFuture<SnapshotIndex> future =
util.getSnapshotIndex(blobId, mock(Metadata.class), true)
+ .handle((snapshotIndex, throwable) -> {
+ if (throwable != null) {
+ Assert.assertEquals(throwable.getMessage(), String.format("Unable
to get SnapshotIndex blob. The blob ID is : %s", blobId));
+ Assert.assertEquals(throwable.getCause().getMessage(),
"org.apache.samza.SamzaException: Exception in deserializing SnapshotIndex
bytes foobar");
+ }
+ return snapshotIndex;
+ });
+ future.get();
+ }
+
@Test
public void testGetSSIThrowsExceptionIfAnyNonIgnoredAsyncBlobStoreErrors() {
String store = "storeName1";
@@ -1045,4 +1066,57 @@ public class TestBlobStoreUtil {
factoryStoreSCMs.put(stateBackendFactory, storeSCMs);
return new CheckpointV2(checkpointId, ImmutableMap.of(), factoryStoreSCMs);
}
+
+ /**
+ * Test {@link BlobStoreManager} to be used to assert SnapshotIndex
deserialization failure
+ * exception message.
+ * We write a dummy string's bytes to the OutputStream parameter of get
method instead of a SnapshotIndex
+ * blob. The OutputStream is used by SnapshotIndexSerde which will fail
during deserialization.
+ * */
+ private static class DeserTestBlobStoreManager extends TestBlobStoreManager {
+ @Override
+ public CompletionStage<Void> get(String id, OutputStream outputStream,
Metadata metadata, boolean getDeletedBlob) {
+ final String randBlob = "foobar";
+ final byte[] byteArray = randBlob.getBytes(StandardCharsets.UTF_8);
+ try {
+ outputStream.write(byteArray);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ /**
+ * Test BlobStoreManager for unit tests.
+ * */
+ private static class TestBlobStoreManager implements BlobStoreManager {
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public CompletionStage<String> put(InputStream inputStream, Metadata
metadata) {
+ return null;
+ }
+
+ @Override
+ public CompletionStage<Void> get(String id, OutputStream outputStream,
Metadata metadata, boolean getDeletedBlob) {
+ return null;
+ }
+
+ @Override
+ public CompletionStage<Void> delete(String id, Metadata metadata) {
+ return null;
+ }
+
+ @Override
+ public CompletionStage<Void> removeTTL(String blobId, Metadata metadata) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+ }
}