This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 4616f7d  [FLINK-19398][connectors/hive] Fix the failure when creating 
hive connector from userclassloader
4616f7d is described below

commit 4616f7d954275081a9a97c5745d5f55072241d17
Author: Yun Gao <[email protected]>
AuthorDate: Thu Dec 3 16:11:35 2020 +0800

    [FLINK-19398][connectors/hive] Fix the failure when creating hive connector 
from userclassloader
    
    This closes #14286
---
 .../bulk/HadoopPathBasedPartFileWriterTest.java    | 53 ------------
 .../bulk/TestHadoopPathBasedBulkWriterFactory.java | 82 +++++++++++++++++++
 .../HadoopPathBasedBulkFormatBuilderTest.java      | 95 ++++++++++++++++++++++
 .../functions/sink/filesystem/BucketFactory.java   |  2 +-
 .../api/functions/sink/filesystem/Buckets.java     |  2 +-
 .../sink/filesystem/DefaultBucketFactoryImpl.java  |  2 +-
 6 files changed, 180 insertions(+), 56 deletions(-)

diff --git 
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
 
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
index 3ee14ee..c0d337a 100644
--- 
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
+++ 
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
@@ -26,12 +26,9 @@ import 
org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFil
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.IOUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -140,54 +137,4 @@ public class HadoopPathBasedPartFileWriterTest extends 
AbstractTestBase {
                        return lines;
                }
        }
-
-       private static class TestHadoopPathBasedBulkWriterFactory implements 
HadoopPathBasedBulkWriter.Factory<String> {
-
-               @Override
-               public HadoopPathBasedBulkWriter<String> create(Path 
targetFilePath, Path inProgressFilePath) {
-                       try {
-                               FileSystem fileSystem = 
FileSystem.get(inProgressFilePath.toUri(), new Configuration());
-                               FSDataOutputStream output = 
fileSystem.create(inProgressFilePath);
-                               return new 
FSDataOutputStreamBulkWriterHadoop(output);
-                       } catch (IOException e) {
-                               ExceptionUtils.rethrow(e);
-                       }
-
-                       return null;
-               }
-       }
-
-       private static class FSDataOutputStreamBulkWriterHadoop implements 
HadoopPathBasedBulkWriter<String> {
-               private final FSDataOutputStream outputStream;
-
-               public FSDataOutputStreamBulkWriterHadoop(FSDataOutputStream 
outputStream) {
-                       this.outputStream = outputStream;
-               }
-
-               @Override
-               public long getSize() throws IOException {
-                       return outputStream.getPos();
-               }
-
-               @Override
-               public void dispose() {
-                       IOUtils.closeQuietly(outputStream);
-               }
-
-               @Override
-               public void addElement(String element) throws IOException {
-                       outputStream.writeBytes(element + "\n");
-               }
-
-               @Override
-               public void flush() throws IOException {
-                       outputStream.flush();
-               }
-
-               @Override
-               public void finish() throws IOException {
-                       outputStream.flush();
-                       outputStream.close();
-               }
-       }
 }
diff --git 
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/TestHadoopPathBasedBulkWriterFactory.java
 
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/TestHadoopPathBasedBulkWriterFactory.java
new file mode 100644
index 0000000..17b2aa9
--- /dev/null
+++ 
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/TestHadoopPathBasedBulkWriterFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.hadoop.bulk;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * A {@link HadoopPathBasedBulkWriter.Factory} implementation used in tests.
+ */
+public class TestHadoopPathBasedBulkWriterFactory implements 
HadoopPathBasedBulkWriter.Factory<String> {
+
+       @Override
+       public HadoopPathBasedBulkWriter<String> create(Path targetFilePath, 
Path inProgressFilePath) {
+               try {
+                       FileSystem fileSystem = 
FileSystem.get(inProgressFilePath.toUri(), new Configuration());
+                       FSDataOutputStream output = 
fileSystem.create(inProgressFilePath);
+                       return new FSDataOutputStreamBulkWriterHadoop(output);
+               } catch (IOException e) {
+                       ExceptionUtils.rethrow(e);
+               }
+
+               return null;
+       }
+
+       private static class FSDataOutputStreamBulkWriterHadoop implements 
HadoopPathBasedBulkWriter<String> {
+               private final FSDataOutputStream outputStream;
+
+               public FSDataOutputStreamBulkWriterHadoop(FSDataOutputStream 
outputStream) {
+                       this.outputStream = outputStream;
+               }
+
+               @Override
+               public long getSize() throws IOException {
+                       return outputStream.getPos();
+               }
+
+               @Override
+               public void dispose() {
+                       IOUtils.closeQuietly(outputStream);
+               }
+
+               @Override
+               public void addElement(String element) throws IOException {
+                       outputStream.writeBytes(element + "\n");
+               }
+
+               @Override
+               public void flush() throws IOException {
+                       outputStream.flush();
+               }
+
+               @Override
+               public void finish() throws IOException {
+                       outputStream.flush();
+                       outputStream.close();
+               }
+       }
+}
diff --git 
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
 
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
new file mode 100644
index 0000000..1ff8ce2
--- /dev/null
+++ 
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
+import 
org.apache.flink.formats.hadoop.bulk.TestHadoopPathBasedBulkWriterFactory;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.util.FlinkUserCodeClassLoader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests the behaviors of {@link HadoopPathBasedBulkFormatBuilder}.
+ */
+public class HadoopPathBasedBulkFormatBuilderTest {
+
+       /**
+        * Tests if we could create {@link HadoopPathBasedBulkFormatBuilder} 
within user classloader.
+        * It is mainly verify we have fixed the issue raised in 
https://issues.apache.org/jira/browse/FLINK-19398.
+        */
+       @Test
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public void testCreatingBuildWithinUserClassLoader() throws Exception {
+               ClassLoader appClassLoader = getClass().getClassLoader();
+               Assume.assumeTrue(appClassLoader instanceof URLClassLoader);
+
+               ClassLoader userClassLoader = new 
SpecifiedChildFirstUserClassLoader(
+                               
HadoopPathBasedBulkFormatBuilder.class.getName(),
+                               appClassLoader,
+                               ((URLClassLoader) appClassLoader).getURLs());
+
+               Class<HadoopPathBasedBulkFormatBuilder> 
userHadoopFormatBuildClass =
+                               (Class<HadoopPathBasedBulkFormatBuilder>) 
userClassLoader.loadClass(
+                                               
HadoopPathBasedBulkFormatBuilder.class.getName());
+               Constructor<?> constructor = 
userHadoopFormatBuildClass.getConstructor(
+                               Path.class,
+                               HadoopPathBasedBulkWriter.Factory.class,
+                               Configuration.class,
+                               BucketAssigner.class);
+               Object hadoopFormatBuilder = constructor.newInstance(
+                               new Path("/tmp"),
+                               new TestHadoopPathBasedBulkWriterFactory(),
+                               new Configuration(),
+                               new DateTimeBucketAssigner<>());
+
+               Buckets<String, String> buckets = (Buckets<String, String>) 
userHadoopFormatBuildClass
+                               .getMethod("createBuckets", int.class)
+                               .invoke(hadoopFormatBuilder, 0);
+               assertNotNull(buckets);
+       }
+
+       private static class SpecifiedChildFirstUserClassLoader extends 
FlinkUserCodeClassLoader {
+
+               private final String specifiedClassName;
+
+               protected SpecifiedChildFirstUserClassLoader(String 
specifiedClassName, ClassLoader parent, URL[] urls) {
+                       super(urls, parent);
+                       this.specifiedClassName = specifiedClassName;
+               }
+
+               @Override
+               protected Class<?> loadClassWithoutExceptionHandling(String 
name, boolean resolve) throws ClassNotFoundException {
+                       if (name.equals(specifiedClassName)) {
+                               return findClass(name);
+                       } else {
+                               return 
super.loadClassWithoutExceptionHandling(name, resolve);
+                       }
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index 6423627..bc51f21 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
  * A factory able to create {@link Bucket buckets} for the {@link 
StreamingFileSink}.
  */
 @Internal
-interface BucketFactory<IN, BucketID> extends Serializable {
+public interface BucketFactory<IN, BucketID> extends Serializable {
 
        Bucket<IN, BucketID> getNewBucket(
                        final int subtaskIndex,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index f936348..eeed2c6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -91,7 +91,7 @@ public class Buckets<IN, BucketID> {
         * @param bucketWriter The {@link BucketWriter} to be used when writing 
data.
         * @param rollingPolicy The {@link RollingPolicy} as specified by the 
user.
         */
-       Buckets(
+       public Buckets(
                        final Path basePath,
                        final BucketAssigner<IN, BucketID> bucketAssigner,
                        final BucketFactory<IN, BucketID> bucketFactory,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
index bb20b97..be992ed 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
@@ -27,7 +27,7 @@ import java.io.IOException;
  * A factory returning {@link Bucket buckets}.
  */
 @Internal
-class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, 
BucketID> {
+public class DefaultBucketFactoryImpl<IN, BucketID> implements 
BucketFactory<IN, BucketID> {
 
        private static final long serialVersionUID = 1L;
 

Reply via email to