This is an automated email from the ASF dual-hosted git repository. czweng pushed a commit to branch classloader-debug in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 93ef76fd4cd852491ea63672826792f5f15e2e02 Author: tsreaper <[email protected]> AuthorDate: Wed Aug 7 16:40:53 2024 +0800 debug --- .../apache/paimon/predicate/PredicateBuilder.java | 32 +++ .../flink/util/FlinkUserCodeClassLoaders.java | 241 +++++++++++++++++++++ 2 files changed, 273 insertions(+) diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java index c54fc31b1..6568a05e6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java @@ -230,18 +230,39 @@ public class PredicateBuilder { } switch (literalType.getTypeRoot()) { case BOOLEAN: + if (o instanceof String) { + o = Boolean.parseBoolean((String) o); + } return o; case BIGINT: + if (o instanceof String) { + return Long.parseLong((String) o); + } return ((Number) o).longValue(); case DOUBLE: + if (o instanceof String) { + return Double.parseDouble((String) o); + } return ((Number) o).doubleValue(); case TINYINT: + if (o instanceof String) { + return Byte.parseByte((String) o); + } return ((Number) o).byteValue(); case SMALLINT: + if (o instanceof String) { + return Short.parseShort((String) o); + } return ((Number) o).shortValue(); case INTEGER: + if (o instanceof String) { + return Integer.parseInt((String) o); + } return ((Number) o).intValue(); case FLOAT: + if (o instanceof String) { + return Float.parseFloat((String) o); + } return ((Number) o).floatValue(); case VARCHAR: return BinaryString.fromString(o.toString()); @@ -258,6 +279,8 @@ public class PredicateBuilder { localDate = ((Date) o).toLocalDate(); } else if (o instanceof LocalDate) { localDate = (LocalDate) o; + } else if (o instanceof String) { + localDate = LocalDate.parse((String) o); } else { throw new UnsupportedOperationException( "Unexpected date literal of class " + o.getClass().getName()); @@ -271,6 +294,8 @@ public class PredicateBuilder { localTime = ((java.sql.Time) o).toLocalTime(); } else if (o instanceof java.time.LocalTime) { localTime = (java.time.LocalTime) o; + } else if (o instanceof String) { + localTime = LocalTime.parse((String) o); } else { throw new UnsupportedOperationException( "Unexpected time literal of class " + o.getClass().getName()); @@ -281,6 +306,9 @@ public class PredicateBuilder { DecimalType decimalType = (DecimalType) literalType; int precision = decimalType.getPrecision(); int scale = decimalType.getScale(); + if (o instanceof String) { + return Decimal.fromBigDecimal(new BigDecimal((String) o), precision, scale); + } return Decimal.fromBigDecimal((BigDecimal) o, precision, scale); case TIMESTAMP_WITHOUT_TIME_ZONE: if (o instanceof java.sql.Timestamp) { @@ -291,6 +319,8 @@ public class PredicateBuilder { return Timestamp.fromLocalDateTime(dateTime); } else if (o instanceof LocalDateTime) { return Timestamp.fromLocalDateTime((LocalDateTime) o); + } else if (o instanceof String) { + return Timestamp.fromLocalDateTime(LocalDateTime.parse((String) o)); } else { throw new UnsupportedOperationException( String.format( @@ -303,6 +333,8 @@ public class PredicateBuilder { return Timestamp.fromInstant(timestamp.toInstant()); } else if (o instanceof Instant) { return Timestamp.fromInstant((Instant) o); + } else if (o instanceof String) { + return Timestamp.fromInstant(Instant.parse((String) o)); } else { throw new UnsupportedOperationException( String.format( diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java new file mode 100644 index 000000000..bf70ffe8c --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.Enumeration; +import java.util.function.Consumer; + +import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER; + +/** Gives the URLClassLoader a nicer name for debugging purposes. */ +@Internal +public class FlinkUserCodeClassLoaders { + + private FlinkUserCodeClassLoaders() {} + + public static MutableURLClassLoader parentFirst( + URL[] urls, + ClassLoader parent, + Consumer<Throwable> classLoadingExceptionHandler, + boolean checkClassLoaderLeak) { + FlinkUserCodeClassLoader classLoader = + new ParentFirstClassLoader(urls, parent, classLoadingExceptionHandler); + return wrapWithSafetyNet(classLoader, checkClassLoaderLeak); + } + + public static MutableURLClassLoader childFirst( + URL[] urls, + ClassLoader parent, + String[] alwaysParentFirstPatterns, + Consumer<Throwable> classLoadingExceptionHandler, + boolean checkClassLoaderLeak) { + FlinkUserCodeClassLoader classLoader = + new ChildFirstClassLoader( + urls, parent, alwaysParentFirstPatterns, classLoadingExceptionHandler); + return wrapWithSafetyNet(classLoader, checkClassLoaderLeak); + } + + public static MutableURLClassLoader create( + final URL[] urls, final ClassLoader parent, final ReadableConfig config) { + final String[] alwaysParentFirstLoaderPatterns = + CoreOptions.getParentFirstLoaderPatterns(config); + final String classLoaderResolveOrder = config.get(CoreOptions.CLASSLOADER_RESOLVE_ORDER); + final FlinkUserCodeClassLoaders.ResolveOrder resolveOrder = + FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder); + final boolean checkClassloaderLeak = config.get(CoreOptions.CHECK_LEAKED_CLASSLOADER); + return create( + resolveOrder, + urls, + parent, + alwaysParentFirstLoaderPatterns, + NOOP_EXCEPTION_HANDLER, + checkClassloaderLeak); + } + + public static MutableURLClassLoader create( + ResolveOrder resolveOrder, + URL[] urls, + ClassLoader parent, + String[] alwaysParentFirstPatterns, + Consumer<Throwable> classLoadingExceptionHandler, + boolean checkClassLoaderLeak) { + + switch (resolveOrder) { + case CHILD_FIRST: + return childFirst( + urls, + parent, + alwaysParentFirstPatterns, + classLoadingExceptionHandler, + checkClassLoaderLeak); + case PARENT_FIRST: + return parentFirst( + urls, parent, classLoadingExceptionHandler, checkClassLoaderLeak); + default: + throw new IllegalArgumentException( + "Unknown class resolution order: " + resolveOrder); + } + } + + private static MutableURLClassLoader wrapWithSafetyNet( + FlinkUserCodeClassLoader classLoader, boolean check) { + return check + ? new SafetyNetWrapperClassLoader(classLoader, classLoader.getParent()) + : classLoader; + } + + /** Class resolution order for Flink URL {@link ClassLoader}. */ + public enum ResolveOrder { + CHILD_FIRST, + PARENT_FIRST; + + public static ResolveOrder fromString(String resolveOrder) { + if (resolveOrder.equalsIgnoreCase("parent-first")) { + return PARENT_FIRST; + } else if (resolveOrder.equalsIgnoreCase("child-first")) { + return CHILD_FIRST; + } else { + throw new IllegalArgumentException("Unknown resolve order: " + resolveOrder); + } + } + } + + /** + * Regular URLClassLoader that first loads from the parent and only after that from the URLs. + */ + @Internal + public static class ParentFirstClassLoader extends FlinkUserCodeClassLoader { + + ParentFirstClassLoader( + URL[] urls, ClassLoader parent, Consumer<Throwable> classLoadingExceptionHandler) { + super(urls, parent, classLoadingExceptionHandler); + } + + static { + ClassLoader.registerAsParallelCapable(); + } + + @Override + public MutableURLClassLoader copy() { + return new ParentFirstClassLoader(getURLs(), getParent(), classLoadingExceptionHandler); + } + } + + /** + * Ensures that holding a reference on the context class loader outliving the scope of user code + * does not prevent the user classloader to be garbage collected (FLINK-16245). + * + * <p>This classloader delegates to the actual user classloader. Upon {@link #close()}, the + * delegate is nulled and can be garbage collected. Additional class resolution will be resolved + * solely through the bootstrap classloader and most likely result in ClassNotFound exceptions. + */ + @Internal + public static class SafetyNetWrapperClassLoader extends MutableURLClassLoader { + private static final Logger LOG = + LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + protected volatile FlinkUserCodeClassLoader inner; + + protected SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner, ClassLoader parent) { + super(new URL[0], parent); + this.inner = inner; + + System.out.println("Hello create SafetyNetWrapperClassLoader " + this + ", " + inner); + new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + final FlinkUserCodeClassLoader inner = this.inner; + if (inner != null) { + System.out.println("OK closing SafetyNetWrapperClassLoader " + this + ", " + inner); + new RuntimeException().printStackTrace(); + + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + this.inner = null; + } + + private FlinkUserCodeClassLoader ensureInner() { + if (inner == null) { + throw new IllegalStateException( + "Trying to access closed classloader. Please check if you store " + + "classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak " + + "occurs in a third party library and cannot be fixed immediately, you can disable this check " + + "with the configuration '" + + CoreOptions.CHECK_LEAKED_CLASSLOADER.key() + + "'."); + } + return inner; + } + + @Override + public Class<?> loadClass(String name) throws ClassNotFoundException { + return ensureInner().loadClass(name); + } + + @Override + protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + // called for dynamic class loading + return ensureInner().loadClass(name, resolve); + } + + @Override + public void addURL(URL url) { + ensureInner().addURL(url); + } + + @Override + public MutableURLClassLoader copy() { + return new SafetyNetWrapperClassLoader( + (FlinkUserCodeClassLoader) inner.copy(), getParent()); + } + + @Override + public URL getResource(String name) { + return ensureInner().getResource(name); + } + + @Override + public Enumeration<URL> getResources(String name) throws IOException { + return ensureInner().getResources(name); + } + + @Override + public URL[] getURLs() { + return ensureInner().getURLs(); + } + + static { + ClassLoader.registerAsParallelCapable(); + } + } +}
