This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch classloader-debug
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 93ef76fd4cd852491ea63672826792f5f15e2e02
Author: tsreaper <[email protected]>
AuthorDate: Wed Aug 7 16:40:53 2024 +0800

    debug
---
 .../apache/paimon/predicate/PredicateBuilder.java  |  32 +++
 .../flink/util/FlinkUserCodeClassLoaders.java      | 241 +++++++++++++++++++++
 2 files changed, 273 insertions(+)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index c54fc31b1..6568a05e6 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -230,18 +230,39 @@ public class PredicateBuilder {
         }
         switch (literalType.getTypeRoot()) {
             case BOOLEAN:
+                if (o instanceof String) {
+                    o = Boolean.parseBoolean((String) o);
+                }
                 return o;
             case BIGINT:
+                if (o instanceof String) {
+                    return Long.parseLong((String) o);
+                }
                 return ((Number) o).longValue();
             case DOUBLE:
+                if (o instanceof String) {
+                    return Double.parseDouble((String) o);
+                }
                 return ((Number) o).doubleValue();
             case TINYINT:
+                if (o instanceof String) {
+                    return Byte.parseByte((String) o);
+                }
                 return ((Number) o).byteValue();
             case SMALLINT:
+                if (o instanceof String) {
+                    return Short.parseShort((String) o);
+                }
                 return ((Number) o).shortValue();
             case INTEGER:
+                if (o instanceof String) {
+                    return Integer.parseInt((String) o);
+                }
                 return ((Number) o).intValue();
             case FLOAT:
+                if (o instanceof String) {
+                    return Float.parseFloat((String) o);
+                }
                 return ((Number) o).floatValue();
             case VARCHAR:
                 return BinaryString.fromString(o.toString());
@@ -258,6 +279,8 @@ public class PredicateBuilder {
                     localDate = ((Date) o).toLocalDate();
                 } else if (o instanceof LocalDate) {
                     localDate = (LocalDate) o;
+                } else if (o instanceof String) {
+                    localDate = LocalDate.parse((String) o);
                 } else {
                     throw new UnsupportedOperationException(
                             "Unexpected date literal of class " + 
o.getClass().getName());
@@ -271,6 +294,8 @@ public class PredicateBuilder {
                     localTime = ((java.sql.Time) o).toLocalTime();
                 } else if (o instanceof java.time.LocalTime) {
                     localTime = (java.time.LocalTime) o;
+                } else if (o instanceof String) {
+                    localTime = LocalTime.parse((String) o);
                 } else {
                     throw new UnsupportedOperationException(
                             "Unexpected time literal of class " + 
o.getClass().getName());
@@ -281,6 +306,9 @@ public class PredicateBuilder {
                 DecimalType decimalType = (DecimalType) literalType;
                 int precision = decimalType.getPrecision();
                 int scale = decimalType.getScale();
+                if (o instanceof String) {
+                    return Decimal.fromBigDecimal(new BigDecimal((String) o), 
precision, scale);
+                }
                 return Decimal.fromBigDecimal((BigDecimal) o, precision, 
scale);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 if (o instanceof java.sql.Timestamp) {
@@ -291,6 +319,8 @@ public class PredicateBuilder {
                     return Timestamp.fromLocalDateTime(dateTime);
                 } else if (o instanceof LocalDateTime) {
                     return Timestamp.fromLocalDateTime((LocalDateTime) o);
+                } else if (o instanceof String) {
+                    return 
Timestamp.fromLocalDateTime(LocalDateTime.parse((String) o));
                 } else {
                     throw new UnsupportedOperationException(
                             String.format(
@@ -303,6 +333,8 @@ public class PredicateBuilder {
                     return Timestamp.fromInstant(timestamp.toInstant());
                 } else if (o instanceof Instant) {
                     return Timestamp.fromInstant((Instant) o);
+                } else if (o instanceof String) {
+                    return Timestamp.fromInstant(Instant.parse((String) o));
                 } else {
                     throw new UnsupportedOperationException(
                             String.format(
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
new file mode 100644
index 000000000..bf70ffe8c
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Enumeration;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
+
+/** Gives the URLClassLoader a nicer name for debugging purposes. */
+@Internal
+public class FlinkUserCodeClassLoaders {
+
+    private FlinkUserCodeClassLoaders() {}
+
+    public static MutableURLClassLoader parentFirst(
+            URL[] urls,
+            ClassLoader parent,
+            Consumer<Throwable> classLoadingExceptionHandler,
+            boolean checkClassLoaderLeak) {
+        FlinkUserCodeClassLoader classLoader =
+                new ParentFirstClassLoader(urls, parent, 
classLoadingExceptionHandler);
+        return wrapWithSafetyNet(classLoader, checkClassLoaderLeak);
+    }
+
+    public static MutableURLClassLoader childFirst(
+            URL[] urls,
+            ClassLoader parent,
+            String[] alwaysParentFirstPatterns,
+            Consumer<Throwable> classLoadingExceptionHandler,
+            boolean checkClassLoaderLeak) {
+        FlinkUserCodeClassLoader classLoader =
+                new ChildFirstClassLoader(
+                        urls, parent, alwaysParentFirstPatterns, 
classLoadingExceptionHandler);
+        return wrapWithSafetyNet(classLoader, checkClassLoaderLeak);
+    }
+
+    public static MutableURLClassLoader create(
+            final URL[] urls, final ClassLoader parent, final ReadableConfig 
config) {
+        final String[] alwaysParentFirstLoaderPatterns =
+                CoreOptions.getParentFirstLoaderPatterns(config);
+        final String classLoaderResolveOrder = 
config.get(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+        final FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
+                
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
+        final boolean checkClassloaderLeak = 
config.get(CoreOptions.CHECK_LEAKED_CLASSLOADER);
+        return create(
+                resolveOrder,
+                urls,
+                parent,
+                alwaysParentFirstLoaderPatterns,
+                NOOP_EXCEPTION_HANDLER,
+                checkClassloaderLeak);
+    }
+
+    public static MutableURLClassLoader create(
+            ResolveOrder resolveOrder,
+            URL[] urls,
+            ClassLoader parent,
+            String[] alwaysParentFirstPatterns,
+            Consumer<Throwable> classLoadingExceptionHandler,
+            boolean checkClassLoaderLeak) {
+
+        switch (resolveOrder) {
+            case CHILD_FIRST:
+                return childFirst(
+                        urls,
+                        parent,
+                        alwaysParentFirstPatterns,
+                        classLoadingExceptionHandler,
+                        checkClassLoaderLeak);
+            case PARENT_FIRST:
+                return parentFirst(
+                        urls, parent, classLoadingExceptionHandler, 
checkClassLoaderLeak);
+            default:
+                throw new IllegalArgumentException(
+                        "Unknown class resolution order: " + resolveOrder);
+        }
+    }
+
+    private static MutableURLClassLoader wrapWithSafetyNet(
+            FlinkUserCodeClassLoader classLoader, boolean check) {
+        return check
+                ? new SafetyNetWrapperClassLoader(classLoader, 
classLoader.getParent())
+                : classLoader;
+    }
+
+    /** Class resolution order for Flink URL {@link ClassLoader}. */
+    public enum ResolveOrder {
+        CHILD_FIRST,
+        PARENT_FIRST;
+
+        public static ResolveOrder fromString(String resolveOrder) {
+            if (resolveOrder.equalsIgnoreCase("parent-first")) {
+                return PARENT_FIRST;
+            } else if (resolveOrder.equalsIgnoreCase("child-first")) {
+                return CHILD_FIRST;
+            } else {
+                throw new IllegalArgumentException("Unknown resolve order: " + 
resolveOrder);
+            }
+        }
+    }
+
+    /**
+     * Regular URLClassLoader that first loads from the parent and only after 
that from the URLs.
+     */
+    @Internal
+    public static class ParentFirstClassLoader extends 
FlinkUserCodeClassLoader {
+
+        ParentFirstClassLoader(
+                URL[] urls, ClassLoader parent, Consumer<Throwable> 
classLoadingExceptionHandler) {
+            super(urls, parent, classLoadingExceptionHandler);
+        }
+
+        static {
+            ClassLoader.registerAsParallelCapable();
+        }
+
+        @Override
+        public MutableURLClassLoader copy() {
+            return new ParentFirstClassLoader(getURLs(), getParent(), 
classLoadingExceptionHandler);
+        }
+    }
+
+    /**
+     * Ensures that holding a reference on the context class loader outliving 
the scope of user code
+     * does not prevent the user classloader to be garbage collected 
(FLINK-16245).
+     *
+     * <p>This classloader delegates to the actual user classloader. Upon 
{@link #close()}, the
+     * delegate is nulled and can be garbage collected. Additional class 
resolution will be resolved
+     * solely through the bootstrap classloader and most likely result in 
ClassNotFound exceptions.
+     */
+    @Internal
+    public static class SafetyNetWrapperClassLoader extends 
MutableURLClassLoader {
+        private static final Logger LOG =
+                LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class);
+
+        protected volatile FlinkUserCodeClassLoader inner;
+
+        protected SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner, 
ClassLoader parent) {
+            super(new URL[0], parent);
+            this.inner = inner;
+
+            System.out.println("Hello create SafetyNetWrapperClassLoader " + 
this + ", " + inner);
+            new RuntimeException().printStackTrace();
+        }
+
+        @Override
+        public void close() {
+            final FlinkUserCodeClassLoader inner = this.inner;
+            if (inner != null) {
+                System.out.println("OK closing SafetyNetWrapperClassLoader " + 
this + ", " + inner);
+                new RuntimeException().printStackTrace();
+
+                try {
+                    inner.close();
+                } catch (IOException e) {
+                    LOG.warn("Could not close user classloader", e);
+                }
+            }
+            this.inner = null;
+        }
+
+        private FlinkUserCodeClassLoader ensureInner() {
+            if (inner == null) {
+                throw new IllegalStateException(
+                        "Trying to access closed classloader. Please check if 
you store "
+                                + "classloaders directly or indirectly in 
static fields. If the stacktrace suggests that the leak "
+                                + "occurs in a third party library and cannot 
be fixed immediately, you can disable this check "
+                                + "with the configuration '"
+                                + CoreOptions.CHECK_LEAKED_CLASSLOADER.key()
+                                + "'.");
+            }
+            return inner;
+        }
+
+        @Override
+        public Class<?> loadClass(String name) throws ClassNotFoundException {
+            return ensureInner().loadClass(name);
+        }
+
+        @Override
+        protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+            // called for dynamic class loading
+            return ensureInner().loadClass(name, resolve);
+        }
+
+        @Override
+        public void addURL(URL url) {
+            ensureInner().addURL(url);
+        }
+
+        @Override
+        public MutableURLClassLoader copy() {
+            return new SafetyNetWrapperClassLoader(
+                    (FlinkUserCodeClassLoader) inner.copy(), getParent());
+        }
+
+        @Override
+        public URL getResource(String name) {
+            return ensureInner().getResource(name);
+        }
+
+        @Override
+        public Enumeration<URL> getResources(String name) throws IOException {
+            return ensureInner().getResources(name);
+        }
+
+        @Override
+        public URL[] getURLs() {
+            return ensureInner().getURLs();
+        }
+
+        static {
+            ClassLoader.registerAsParallelCapable();
+        }
+    }
+}

Reply via email to