henrib commented on code in PR #4194: URL: https://github.com/apache/hive/pull/4194#discussion_r1187474104
########## standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/properties/SerializationProxy.java: ########## @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.Externalizable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.lang.reflect.Executable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.apache.hadoop.hive.metastore.properties.Serializer.SERIALIZER; + +/** + * The serialization proxy template. + * <p> + * This allows a class that defines final members to be made serializable in an easy way. + * The class <em>must</em> implement: + * <ul> + * <li>a constructor that takes a DataInput (or derived class) as parameter</li> + * <li>a write method that takes a DataOutput (or derived class) as parameter</li> + * </ul> + * <p> + * One should consider the constructor as being potentially fed with an invalid stream so + * all usual checks of a public constructor should apply. + * </p> + * Standard usage is to add the Serializable interface implementation through the following 2 methods: + * <code> + * private Object writeReplace() throws ObjectStreamException { + * return new SerializationProxy<TheClass>(this); + * } + * private void readObject(ObjectInputStream in)throws IOException,ClassNotFoundException{ + * throw new InvalidObjectException("proxy required"); + * } + * </code> + * @param <T> the serializable object type + */ +public class SerializationProxy<T extends Serializable> implements Externalizable { + /** Serial version. */ + private static final long serialVersionUID = 202212281757L; + /** The logger. */ + public static final Logger LOGGER = LoggerFactory.getLogger(SerializationProxy.class); + /** The map of class names to types. */ + private static final ConcurrentMap<String, Type<?>> TYPES = new ConcurrentHashMap<>(); + /** The list of registered pre-defined classes. */ + private static final List<Type<?>> REGISTERED = new ArrayList<>(); + /** A thread local context used for arguments passing during serialization/de-serialization. */ + private static final ThreadLocal<Object[]> EXTRA_ARGUMENTS = new ThreadLocal<>(); + + /** The type of instance being read or written. */ + private transient Type<T> type = null; + /** The instance being read or written. */ + private transient T proxied = null; + + /** + * Wraps any error that may occur whilst using reflective calls. + */ + public static class ProxyException extends RuntimeException { + public ProxyException(Throwable cause) { + super(cause); + } + + public ProxyException(String msg) { + super(msg); + } + + /** + * Convert an exception to a VDBRuntimeException. + * @param cause the exception to convert + * @return the wrapping CubeException + */ + public static ProxyException convert(Throwable cause) { + if (cause instanceof ProxyException) { + return (ProxyException) cause; + } else { + return new ProxyException(cause); + } + } + } + + /** + * Constructor called from proxify.writeReplace(). + * @param proxify the instance to proxy + */ + @SuppressWarnings("unchecked") + public SerializationProxy(T proxify) { + Class<T> clazz = (Class<T>) proxify.getClass(); + type = (Type<T>) TYPES.computeIfAbsent(clazz.getName(), this::createType); + proxied = proxify; + } + + /** + * Default constructor. + */ + public SerializationProxy() { + // do nothing + } + + /** + * Sets the extra-arguments as a thread local context. + * <p>Used to pass extra arguments o constructors/write methods.</p> + * @param o the arguments + */ + public static void setExtraArguments(Object[] o) { + if (null == o) { + EXTRA_ARGUMENTS.remove(); + } else { + EXTRA_ARGUMENTS.set(o); + } + } + + /** + * Gets the extra-arguments to ctor/write executable stored in a thread local context. + * @return the arguments + */ + public static Object[] getExtraArguments() { + return EXTRA_ARGUMENTS.get(); + } + + /** + * Swaps the thread local context. + * <p>This may be used to stack up contexts during cascading calls.</p> + * @param newArgs the new arguments + * @return the down-stack caller arguments + */ + public static Object[] swapExtraArguments(Object[] newArgs) { + Object[] previous = EXTRA_ARGUMENTS.get(); + setExtraArguments(newArgs); + return previous; + } + + /** + * Unloads the proxy. + */ + public static void unload() { + EXTRA_ARGUMENTS.remove(); + TYPES.clear(); + } + + /** + * Registers a pre-defined class (known to be used throughout the whole application). + * @param <T> the type + * @param slot the slot number + * @param clazz the class + */ + public static <T extends Serializable> void registerType(final int slot, Class<T> clazz) { + synchronized (REGISTERED) { + Type<T> ntype = new Type<>(clazz); + ntype.slot = slot; + if (slot >= 255) { + throw new IllegalArgumentException(ntype + "@" + slot + ": can not register more than 254 types"); + } + List<Type<?>> types = REGISTERED; + while (types.size() <= slot) { + types.add(null); + } + if (types.get(slot) != null) { + throw new IllegalArgumentException(ntype + "@" + slot + ": slot already used by " + types.get(slot)); + } + types.set(slot, ntype); + TYPES.put(clazz.getName(), ntype); + } + } + + /** + * Called by serialization after readExternal. + * @return the proxied instance + * @throws IOException for signature compliance + */ + public Object readResolve() throws IOException { + return proxied; + } + + @Override + public void readExternal(ObjectInput in) throws IOException { + long serial = in.readLong(); + if (serial != serialVersionUID) { + throw new ProxyException("invalid serial version, got " + serial +", expected " + serialVersionUID); + } + type = readType(in); + proxied = type.proxyNew(in); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(serialVersionUID); + writeType(type, out); + type.proxyWrite(proxied, out); + } + + /** + * Converts a serializable object to an array of bytes. + * @param serializable the object to serialize + * @param args the proxy arguments + * @return the array of bytes + * @throws ProxyException on any underlying error + */ + public static byte[] toBytes(Serializable serializable, Object... args) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(512); + final Object[] stack = SerializationProxy.swapExtraArguments(args); + try (ObjectOutput oos = new ObjectOutputStream(bos)) { + oos.writeObject(serializable); + oos.flush(); + return bos.toByteArray(); + } catch (IOException xany) { + throw ProxyException.convert(xany); + } finally { + SerializationProxy.swapExtraArguments(stack); + } + } + + /** + * Materialize a serializable object from an array of bytes. + * @param bytes the bytes + * @param args the proxy arguments + * @return the object + * @throws ProxyException on any underlying error + */ + public static <T extends Serializable> T fromBytes(byte[] bytes, Object... args) { + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + final Object[] stack = SerializationProxy.swapExtraArguments(args); + try (ObjectInput ois = new ObjectInputStream(bis)) { + return (T) ois.readObject(); + } catch (IOException | ClassNotFoundException | ClassCastException xany) { + throw ProxyException.convert(xany); + } finally { + SerializationProxy.swapExtraArguments(stack); + } + } + + /** + * Saves an object to persistent storage. + * @param file the file to write to + * @param persist the object to serialize + * @param args the proxy constructor arguments + * @return true if successful, false if file is null + * @throws ProxyException in case of low level error + */ + public static boolean write(File file, Serializable persist, Object... args) { + return SERIALIZER.write(file, persist, args); + } + + /** + * Saves an object to persistent storage. + * @param out the stream to write to + * @param persist the object to serialize + * @param args the proxy write method arguments + * @return true if successful, false if file is null + * @throws ProxyException in case of low level error + */ + public static boolean write(OutputStream out, Serializable persist, Object... args) { + return SERIALIZER.write(out, persist, args); + } + + /** + * Loads an object from the persistent storage. + * @param file the file to read from + * @param args the proxy arguments + * @return the object or null if file is null + * @throws ProxyException in case of low level error + */ + public static Serializable read(File file, Object... args) { + return SERIALIZER.read(file, args); + } + + /** + * Loads an object from the persistent storage. + * @param in the stream to read from + * @param args the proxy arguments + * @return the object or null if file is null + * @throws ProxyException in case of low level error + */ + public static <T extends Serializable> T read(InputStream in, Object... args) { + return SERIALIZER.read(in, args); + } + + /** + * Creates a Type using a class name. + * @param cname the class name + * @return a type instance + * @throws ProxyException on any underlying error + */ + protected Type<T> createType(String cname) { + try { + @SuppressWarnings("unchecked") + Class<T> clazz = (Class<T>) Class.forName(cname); + return new Type<>(clazz); + } catch (ClassNotFoundException xnotfound) { + throw ProxyException.convert(xnotfound); + } + } + + /** + * When writing out this instance, write down the canonical class name it proxifies. + * @param out the output + * @throws IOException if things go wrong + */ + protected void writeType(Type<?> type, DataOutput out) throws IOException { + int slot = type.getSlot(); + out.write(slot); + if (slot == 255) { + out.writeUTF(type.getTargetName()); + } + } + + /** + * When reading an instance, fetch the type through the canonical class name that was persisted. + * @param in the input + * @throws IOException on read error + * @throws ProxyException if class was expected to be registered but can not be found + */ + @SuppressWarnings("unchecked") + protected Type<T> readType(DataInput in) throws IOException { + final Type<T> type; + String className = "?"; + int slot = (int) in.readByte() & 0xff; + if (slot == 255) { + className = in.readUTF(); + type = (Type<T>) TYPES.computeIfAbsent(className, this::createType); + } else if (slot < REGISTERED.size()) { + type = (Type<T>) REGISTERED.get(slot); + } else { + type = null; + } + if (type == null) { + throw new ProxyException("can not resolve class @ " + slot +", " + className); + } + return type; + } + + /** + * Encapsulates the mandatory constructor and write methods for a given proxified class. + * @param <T> the proxified class + */ + protected static class Type<T extends Serializable> { + private final Constructor<T>[] ctors; + private final Method[] writes; + private transient int slot = 255; + + /** + * Creates a new instance of type. + * @param clazz the proxified class + */ + public Type(Class<T> clazz) { + ctors = typeConstructors(clazz); + writes = typeWrites(clazz); + } + + /** + * The slot number if the class is registered. + * @return the slot number, 255 means not-registered + */ + public int getSlot() { + return slot; + } + + /** + * @return the target class + */ + public String getTargetName() { + // there is always at least one ctor + return ctors[0].getDeclaringClass().getName(); + } + + /** + * Compare parameter signatures of executables. + * @param lhs left-hand side + * @param rhs right-hand side + * @return 0 if equal, +/- 1 if left </> than right + */ + private static int compareSignatures(Executable lhs, Executable rhs) { + return compareSignatures(lhs.getParameterTypes(), rhs.getParameterTypes()); + } + + /** + * Compare executables parameter signatures. + * @param lhs left-hand side executable + * @param rhs right-hand side executable + * @return 0 if equal, +/- 1 if left </> than right + */ + private static int compareSignatures(Class<?>[] lhs, Class<?>[] rhs) { + if (lhs.length < rhs.length) { + return -1; + } + if (lhs.length > rhs.length) { + return 1; + } + int cmp = 0; + // lhs.length == rhs.length + final int length = lhs.length; + for (int p = 0; p < length; ++p) { + Class<?> actual = lhs[p]; + Class<?> formal = rhs[p]; + if (formal != null && actual != null && !formal.isAssignableFrom(actual)) { + // if formal parameter is primitive and actual argument is compatible + int dist; + if (formal.isPrimitive() && (dist = CONVERTIBLES.get(formal).indexOf(actual)) >= 0) { + cmp += dist; + continue; + } + dist = formal.getName().compareTo(actual.getName()); + if (dist != 0) { + return cmp * (length - p); Review Comment: Should be `dist * (length - p)`, good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
