This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 3050d373ed09956bad08eb88b75c4c35fe7a38f3 Author: Hussain Towaileb <[email protected]> AuthorDate: Mon Oct 13 21:50:16 2025 +0300 [ASTERIXDB-3657][FAIL]: handle non-serializable exceptions Ext-ref: MB-68744 Change-Id: I55e69623068a6c1759803cc699417021910237fd Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20485 Reviewed-by: Hussain Towaileb <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Tested-by: Hussain Towaileb <[email protected]> --- .../asterix/hyracks/bootstrap/CCApplication.java | 9 +++ .../asterix/hyracks/bootstrap/NCApplication.java | 9 +++ .../external/input/HDFSDataSourceFactory.java | 3 +- .../writer/HDFSExternalFileWriterFactory.java | 2 +- ...ReplacementsAwareJavaSerializationProvider.java | 82 ++++++++++++++++++++++ .../hyracks/api/util/JavaSerializationUtils.java | 39 +++++++--- .../apache/hyracks/util/ThrowingIOFunction.java | 40 +++++++++++ 7 files changed, 174 insertions(+), 10 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 4e35ade636..1af75c168c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -25,6 +25,7 @@ import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNEC import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE; import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED; import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN; +import static org.apache.hyracks.api.util.JavaSerializationUtils.registerReplacement; import static org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT; import java.io.File; @@ -120,6 +121,7 @@ import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; import org.apache.hyracks.api.result.IJobResultCallback; +import org.apache.hyracks.api.util.JavaSerializationUtils.SerializableExceptionProxy; import org.apache.hyracks.control.cc.BaseCCApplication; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.controllers.CCConfig; @@ -137,6 +139,8 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.azure.storage.blob.models.BlobStorageException; + public class CCApplication extends BaseCCApplication { private static final Logger LOGGER = LogManager.getLogger(); @@ -154,6 +158,11 @@ public class CCApplication extends BaseCCApplication { ccServiceCtx.setThreadFactory( new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager())); validateEnvironment(); + registerSerializationReplacements(); + } + + private void registerSerializationReplacements() { + registerReplacement(BlobStorageException.class, SerializableExceptionProxy::new); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 0e6a04be20..da606c0d4c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -25,6 +25,7 @@ import static org.apache.asterix.common.utils.Servlets.QUERY_SERVICE; import static org.apache.asterix.common.utils.Servlets.QUERY_STATUS; import static org.apache.asterix.common.utils.Servlets.UDF; import static org.apache.asterix.common.utils.Servlets.UDF_RECOVERY; +import static org.apache.hyracks.api.util.JavaSerializationUtils.registerReplacement; import static org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT; import java.io.File; @@ -102,6 +103,7 @@ import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.job.resource.NodeCapacity; +import org.apache.hyracks.api.util.JavaSerializationUtils.SerializableExceptionProxy; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.BaseNCApplication; import org.apache.hyracks.control.nc.NodeControllerService; @@ -115,6 +117,8 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.azure.storage.blob.models.BlobStorageException; + public class NCApplication extends BaseNCApplication { private static final Logger LOGGER = LogManager.getLogger(); protected NCExtensionManager ncExtensionManager; @@ -141,6 +145,11 @@ public class NCApplication extends BaseNCApplication { new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), ncServiceCtx.getLifeCycleComponentManager())); validateEnvironment(); configurePersistedResourceRegistry(); + registerSerializationReplacements(); + } + + private void registerSerializationReplacements() { + registerReplacement(BlobStorageException.class, SerializableExceptionProxy::new); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java index 8af73428bb..b820147c93 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java @@ -166,7 +166,8 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IExt } catch (InterruptedException ex) { throw HyracksDataException.create(ex); } catch (IOException ex) { - throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ExceptionUtils.getMessageOrToString(ex)); + throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, + ExceptionUtils.getMessageOrToString(ex)); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java index 5720fc32a5..711cd0506d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java @@ -137,7 +137,7 @@ public class HDFSExternalFileWriterFactory implements IExternalFileWriterFactory doValidate(testFs); } } catch (IOException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ExceptionUtils.getMessageOrToString(ex)); + throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex, ExceptionUtils.getMessageOrToString(ex)); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java new file mode 100644 index 0000000000..15557961a0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.comm; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hyracks.util.ThrowingIOFunction; + +public class ReplacementsAwareJavaSerializationProvider implements IJavaSerializationProvider { + public static final ReplacementsAwareJavaSerializationProvider INSTANCE = + new ReplacementsAwareJavaSerializationProvider(); + private static final Map<Class<?>, ThrowingIOFunction<Object, Object>> replacements = new ConcurrentHashMap<>(); + + private ReplacementsAwareJavaSerializationProvider() { + } + + @Override + public ObjectOutputStream newObjectOutputStream(OutputStream out) throws IOException { + return new ReplacementsAwareObjectOutputStream(out); + } + + public Map<Class<?>, ThrowingIOFunction<Object, Object>> getReplacements() { + return Collections.unmodifiableMap(replacements); + } + + public void registerReplacement(Class<?> clazz, ThrowingIOFunction<Object, Object> replacementFunction) { + replacements.put(clazz, replacementFunction); + } + + private static class ReplacementsAwareObjectOutputStream extends ObjectOutputStream { + public ReplacementsAwareObjectOutputStream(OutputStream out) throws IOException { + super(out); + enableReplaceObject(true); + } + + @Override + protected Object replaceObject(Object object) throws IOException { + Class<?> clazz = object.getClass(); + if (clazz.isSynthetic()) { + return super.replaceObject(object); + } + + // try exact match first (fast path) + ThrowingIOFunction<Object, Object> replacementFunction = replacements.get(clazz); + if (replacementFunction != null) { + return replacementFunction.process(object); + } + + // fallback: match by assignability (handles subclasses / interfaces) + for (Map.Entry<Class<?>, ThrowingIOFunction<Object, Object>> e : replacements.entrySet()) { + if (e.getKey().isInstance(object)) { + INSTANCE.registerReplacement(clazz, e.getValue()); + return e.getValue().process(object); + } + } + + INSTANCE.registerReplacement(clazz, super::replaceObject); + return super.replaceObject(object); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java index 8e2420498e..f51feba61f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java @@ -28,12 +28,15 @@ import java.io.ObjectStreamClass; import java.io.Serializable; import java.lang.reflect.Modifier; import java.lang.reflect.Proxy; +import java.util.Map; -import org.apache.hyracks.api.comm.DefaultJavaSerializationProvider; import org.apache.hyracks.api.comm.IJavaSerializationProvider; +import org.apache.hyracks.api.comm.ReplacementsAwareJavaSerializationProvider; +import org.apache.hyracks.util.ThrowingIOFunction; public class JavaSerializationUtils { - private static IJavaSerializationProvider serProvider = DefaultJavaSerializationProvider.INSTANCE; + private static final ReplacementsAwareJavaSerializationProvider serProvider = + ReplacementsAwareJavaSerializationProvider.INSTANCE; private JavaSerializationUtils() { } @@ -89,10 +92,6 @@ public class JavaSerializationUtils { return Class.forName(className); } - public static void setSerializationProvider(IJavaSerializationProvider serProvider) { - JavaSerializationUtils.serProvider = serProvider; - } - public static IJavaSerializationProvider getSerializationProvider() { return serProvider; } @@ -106,7 +105,7 @@ public class JavaSerializationUtils { } private static class ClassLoaderObjectInputStream extends ObjectInputStream { - private ClassLoader classLoader; + private final ClassLoader classLoader; protected ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException, SecurityException { @@ -120,7 +119,7 @@ public class JavaSerializationUtils { } @Override - protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException { + protected Class<?> resolveProxyClass(String[] interfaces) throws ClassNotFoundException { ClassLoader nonPublicLoader = null; boolean hasNonPublicInterface = false; @@ -147,4 +146,28 @@ public class JavaSerializationUtils { } } } + + public static Map<Class<?>, ThrowingIOFunction<Object, Object>> getReplacements() { + return serProvider.getReplacements(); + } + + public static <T> void registerReplacement(Class<T> clazz, ThrowingIOFunction<? super T, ?> replacementFunction) { + serProvider.registerReplacement(clazz, object -> replacementFunction.process(clazz.cast(object))); + } + + public static class SerializableExceptionProxy extends Throwable { + private static final long serialVersionUID = 1L; + private final String type; + + public SerializableExceptionProxy(Throwable t) { + super(ExceptionUtils.getMessageOrToString(t)); + this.type = t.getClass().getName(); + setStackTrace(t.getStackTrace()); + } + + @Override + public String toString() { + return type + ": " + getMessage(); + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java new file mode 100644 index 0000000000..5829ae2cee --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +import java.io.IOException; +import java.util.function.Function; + +import com.google.common.util.concurrent.UncheckedExecutionException; + +@FunctionalInterface +public interface ThrowingIOFunction<I, R> { + R process(I input) throws IOException; + + @SuppressWarnings("Duplicates") + static <I, R> Function<I, R> asUnchecked(ThrowingIOFunction<I, R> consumer) { + return input -> { + try { + return consumer.process(input); + } catch (Exception e) { + throw new UncheckedExecutionException(e); + } + }; + } +}
