This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 3050d373ed09956bad08eb88b75c4c35fe7a38f3
Author: Hussain Towaileb <[email protected]>
AuthorDate: Mon Oct 13 21:50:16 2025 +0300

    [ASTERIXDB-3657][FAIL]: handle non-serializable exceptions
    
    Ext-ref: MB-68744
    Change-Id: I55e69623068a6c1759803cc699417021910237fd
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20485
    Reviewed-by: Hussain Towaileb <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
    Tested-by: Hussain Towaileb <[email protected]>
---
 .../asterix/hyracks/bootstrap/CCApplication.java   |  9 +++
 .../asterix/hyracks/bootstrap/NCApplication.java   |  9 +++
 .../external/input/HDFSDataSourceFactory.java      |  3 +-
 .../writer/HDFSExternalFileWriterFactory.java      |  2 +-
 ...ReplacementsAwareJavaSerializationProvider.java | 82 ++++++++++++++++++++++
 .../hyracks/api/util/JavaSerializationUtils.java   | 39 +++++++---
 .../apache/hyracks/util/ThrowingIOFunction.java    | 40 +++++++++++
 7 files changed, 174 insertions(+), 10 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 4e35ade636..1af75c168c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -25,6 +25,7 @@ import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNEC
 import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE;
 import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED;
 import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN;
+import static 
org.apache.hyracks.api.util.JavaSerializationUtils.registerReplacement;
 import static 
org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;
 
 import java.io.File;
@@ -120,6 +121,7 @@ import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.result.IJobResultCallback;
+import 
org.apache.hyracks.api.util.JavaSerializationUtils.SerializableExceptionProxy;
 import org.apache.hyracks.control.cc.BaseCCApplication;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -137,6 +139,8 @@ import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.azure.storage.blob.models.BlobStorageException;
+
 public class CCApplication extends BaseCCApplication {
 
     private static final Logger LOGGER = LogManager.getLogger();
@@ -154,6 +158,11 @@ public class CCApplication extends BaseCCApplication {
         ccServiceCtx.setThreadFactory(
                 new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new 
LifeCycleComponentManager()));
         validateEnvironment();
+        registerSerializationReplacements();
+    }
+
+    private void registerSerializationReplacements() {
+        registerReplacement(BlobStorageException.class, 
SerializableExceptionProxy::new);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 0e6a04be20..da606c0d4c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -25,6 +25,7 @@ import static 
org.apache.asterix.common.utils.Servlets.QUERY_SERVICE;
 import static org.apache.asterix.common.utils.Servlets.QUERY_STATUS;
 import static org.apache.asterix.common.utils.Servlets.UDF;
 import static org.apache.asterix.common.utils.Servlets.UDF_RECOVERY;
+import static 
org.apache.hyracks.api.util.JavaSerializationUtils.registerReplacement;
 import static 
org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;
 
 import java.io.File;
@@ -102,6 +103,7 @@ import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
+import 
org.apache.hyracks.api.util.JavaSerializationUtils.SerializableExceptionProxy;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.BaseNCApplication;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -115,6 +117,8 @@ import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.azure.storage.blob.models.BlobStorageException;
+
 public class NCApplication extends BaseNCApplication {
     private static final Logger LOGGER = LogManager.getLogger();
     protected NCExtensionManager ncExtensionManager;
@@ -141,6 +145,11 @@ public class NCApplication extends BaseNCApplication {
                 new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), 
ncServiceCtx.getLifeCycleComponentManager()));
         validateEnvironment();
         configurePersistedResourceRegistry();
+        registerSerializationReplacements();
+    }
+
+    private void registerSerializationReplacements() {
+        registerReplacement(BlobStorageException.class, 
SerializableExceptionProxy::new);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 8af73428bb..b820147c93 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -166,7 +166,8 @@ public class HDFSDataSourceFactory implements 
IRecordReaderFactory<Object>, IExt
         } catch (InterruptedException ex) {
             throw HyracksDataException.create(ex);
         } catch (IOException ex) {
-            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ExceptionUtils.getMessageOrToString(ex));
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex,
+                    ExceptionUtils.getMessageOrToString(ex));
         }
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
index 5720fc32a5..711cd0506d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -137,7 +137,7 @@ public class HDFSExternalFileWriterFactory implements 
IExternalFileWriterFactory
                 doValidate(testFs);
             }
         } catch (IOException ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, 
ExceptionUtils.getMessageOrToString(ex));
+            throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex, 
ExceptionUtils.getMessageOrToString(ex));
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java
new file mode 100644
index 0000000000..15557961a0
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hyracks.util.ThrowingIOFunction;
+
+public class ReplacementsAwareJavaSerializationProvider implements 
IJavaSerializationProvider {
+    public static final ReplacementsAwareJavaSerializationProvider INSTANCE =
+            new ReplacementsAwareJavaSerializationProvider();
+    private static final Map<Class<?>, ThrowingIOFunction<Object, Object>> 
replacements = new ConcurrentHashMap<>();
+
+    private ReplacementsAwareJavaSerializationProvider() {
+    }
+
+    @Override
+    public ObjectOutputStream newObjectOutputStream(OutputStream out) throws 
IOException {
+        return new ReplacementsAwareObjectOutputStream(out);
+    }
+
+    public Map<Class<?>, ThrowingIOFunction<Object, Object>> getReplacements() 
{
+        return Collections.unmodifiableMap(replacements);
+    }
+
+    public void registerReplacement(Class<?> clazz, ThrowingIOFunction<Object, 
Object> replacementFunction) {
+        replacements.put(clazz, replacementFunction);
+    }
+
+    private static class ReplacementsAwareObjectOutputStream extends 
ObjectOutputStream {
+        public ReplacementsAwareObjectOutputStream(OutputStream out) throws 
IOException {
+            super(out);
+            enableReplaceObject(true);
+        }
+
+        @Override
+        protected Object replaceObject(Object object) throws IOException {
+            Class<?> clazz = object.getClass();
+            if (clazz.isSynthetic()) {
+                return super.replaceObject(object);
+            }
+
+            // try exact match first (fast path)
+            ThrowingIOFunction<Object, Object> replacementFunction = 
replacements.get(clazz);
+            if (replacementFunction != null) {
+                return replacementFunction.process(object);
+            }
+
+            // fallback: match by assignability (handles subclasses / 
interfaces)
+            for (Map.Entry<Class<?>, ThrowingIOFunction<Object, Object>> e : 
replacements.entrySet()) {
+                if (e.getKey().isInstance(object)) {
+                    INSTANCE.registerReplacement(clazz, e.getValue());
+                    return e.getValue().process(object);
+                }
+            }
+
+            INSTANCE.registerReplacement(clazz, super::replaceObject);
+            return super.replaceObject(object);
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
index 8e2420498e..f51feba61f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
@@ -28,12 +28,15 @@ import java.io.ObjectStreamClass;
 import java.io.Serializable;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.Proxy;
+import java.util.Map;
 
-import org.apache.hyracks.api.comm.DefaultJavaSerializationProvider;
 import org.apache.hyracks.api.comm.IJavaSerializationProvider;
+import org.apache.hyracks.api.comm.ReplacementsAwareJavaSerializationProvider;
+import org.apache.hyracks.util.ThrowingIOFunction;
 
 public class JavaSerializationUtils {
-    private static IJavaSerializationProvider serProvider = 
DefaultJavaSerializationProvider.INSTANCE;
+    private static final ReplacementsAwareJavaSerializationProvider 
serProvider =
+            ReplacementsAwareJavaSerializationProvider.INSTANCE;
 
     private JavaSerializationUtils() {
     }
@@ -89,10 +92,6 @@ public class JavaSerializationUtils {
         return Class.forName(className);
     }
 
-    public static void setSerializationProvider(IJavaSerializationProvider 
serProvider) {
-        JavaSerializationUtils.serProvider = serProvider;
-    }
-
     public static IJavaSerializationProvider getSerializationProvider() {
         return serProvider;
     }
@@ -106,7 +105,7 @@ public class JavaSerializationUtils {
     }
 
     private static class ClassLoaderObjectInputStream extends 
ObjectInputStream {
-        private ClassLoader classLoader;
+        private final ClassLoader classLoader;
 
         protected ClassLoaderObjectInputStream(InputStream in, ClassLoader 
classLoader)
                 throws IOException, SecurityException {
@@ -120,7 +119,7 @@ public class JavaSerializationUtils {
         }
 
         @Override
-        protected Class<?> resolveProxyClass(String[] interfaces) throws 
IOException, ClassNotFoundException {
+        protected Class<?> resolveProxyClass(String[] interfaces) throws 
ClassNotFoundException {
             ClassLoader nonPublicLoader = null;
             boolean hasNonPublicInterface = false;
 
@@ -147,4 +146,28 @@ public class JavaSerializationUtils {
             }
         }
     }
+
+    public static Map<Class<?>, ThrowingIOFunction<Object, Object>> 
getReplacements() {
+        return serProvider.getReplacements();
+    }
+
+    public static <T> void registerReplacement(Class<T> clazz, 
ThrowingIOFunction<? super T, ?> replacementFunction) {
+        serProvider.registerReplacement(clazz, object -> 
replacementFunction.process(clazz.cast(object)));
+    }
+
+    public static class SerializableExceptionProxy extends Throwable {
+        private static final long serialVersionUID = 1L;
+        private final String type;
+
+        public SerializableExceptionProxy(Throwable t) {
+            super(ExceptionUtils.getMessageOrToString(t));
+            this.type = t.getClass().getName();
+            setStackTrace(t.getStackTrace());
+        }
+
+        @Override
+        public String toString() {
+            return type + ": " + getMessage();
+        }
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java
new file mode 100644
index 0000000000..5829ae2cee
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+@FunctionalInterface
+public interface ThrowingIOFunction<I, R> {
+    R process(I input) throws IOException;
+
+    @SuppressWarnings("Duplicates")
+    static <I, R> Function<I, R> asUnchecked(ThrowingIOFunction<I, R> 
consumer) {
+        return input -> {
+            try {
+                return consumer.process(input);
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e);
+            }
+        };
+    }
+}

Reply via email to