This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 4616f7d [FLINK-19398][connectors/hive] Fix the failure when creating
hive connector from userclassloader
4616f7d is described below
commit 4616f7d954275081a9a97c5745d5f55072241d17
Author: Yun Gao <[email protected]>
AuthorDate: Thu Dec 3 16:11:35 2020 +0800
[FLINK-19398][connectors/hive] Fix the failure when creating hive connector
from userclassloader
This closes #14286
---
.../bulk/HadoopPathBasedPartFileWriterTest.java | 53 ------------
.../bulk/TestHadoopPathBasedBulkWriterFactory.java | 82 +++++++++++++++++++
.../HadoopPathBasedBulkFormatBuilderTest.java | 95 ++++++++++++++++++++++
.../functions/sink/filesystem/BucketFactory.java | 2 +-
.../api/functions/sink/filesystem/Buckets.java | 2 +-
.../sink/filesystem/DefaultBucketFactoryImpl.java | 2 +-
6 files changed, 180 insertions(+), 56 deletions(-)
diff --git
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
index 3ee14ee..c0d337a 100644
---
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
+++
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
@@ -26,12 +26,9 @@ import
org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFil
import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -140,54 +137,4 @@ public class HadoopPathBasedPartFileWriterTest extends
AbstractTestBase {
return lines;
}
}
-
- private static class TestHadoopPathBasedBulkWriterFactory implements
HadoopPathBasedBulkWriter.Factory<String> {
-
- @Override
- public HadoopPathBasedBulkWriter<String> create(Path
targetFilePath, Path inProgressFilePath) {
- try {
- FileSystem fileSystem =
FileSystem.get(inProgressFilePath.toUri(), new Configuration());
- FSDataOutputStream output =
fileSystem.create(inProgressFilePath);
- return new
FSDataOutputStreamBulkWriterHadoop(output);
- } catch (IOException e) {
- ExceptionUtils.rethrow(e);
- }
-
- return null;
- }
- }
-
- private static class FSDataOutputStreamBulkWriterHadoop implements
HadoopPathBasedBulkWriter<String> {
- private final FSDataOutputStream outputStream;
-
- public FSDataOutputStreamBulkWriterHadoop(FSDataOutputStream
outputStream) {
- this.outputStream = outputStream;
- }
-
- @Override
- public long getSize() throws IOException {
- return outputStream.getPos();
- }
-
- @Override
- public void dispose() {
- IOUtils.closeQuietly(outputStream);
- }
-
- @Override
- public void addElement(String element) throws IOException {
- outputStream.writeBytes(element + "\n");
- }
-
- @Override
- public void flush() throws IOException {
- outputStream.flush();
- }
-
- @Override
- public void finish() throws IOException {
- outputStream.flush();
- outputStream.close();
- }
- }
}
diff --git
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/TestHadoopPathBasedBulkWriterFactory.java
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/TestHadoopPathBasedBulkWriterFactory.java
new file mode 100644
index 0000000..17b2aa9
--- /dev/null
+++
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/TestHadoopPathBasedBulkWriterFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.hadoop.bulk;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * A {@link HadoopPathBasedBulkWriter.Factory} implementation used in tests.
+ */
+public class TestHadoopPathBasedBulkWriterFactory implements
HadoopPathBasedBulkWriter.Factory<String> {
+
+ @Override
+ public HadoopPathBasedBulkWriter<String> create(Path targetFilePath,
Path inProgressFilePath) {
+ try {
+ FileSystem fileSystem =
FileSystem.get(inProgressFilePath.toUri(), new Configuration());
+ FSDataOutputStream output =
fileSystem.create(inProgressFilePath);
+ return new FSDataOutputStreamBulkWriterHadoop(output);
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(e);
+ }
+
+ return null;
+ }
+
+ private static class FSDataOutputStreamBulkWriterHadoop implements
HadoopPathBasedBulkWriter<String> {
+ private final FSDataOutputStream outputStream;
+
+ public FSDataOutputStreamBulkWriterHadoop(FSDataOutputStream
outputStream) {
+ this.outputStream = outputStream;
+ }
+
+ @Override
+ public long getSize() throws IOException {
+ return outputStream.getPos();
+ }
+
+ @Override
+ public void dispose() {
+ IOUtils.closeQuietly(outputStream);
+ }
+
+ @Override
+ public void addElement(String element) throws IOException {
+ outputStream.writeBytes(element + "\n");
+ }
+
+ @Override
+ public void flush() throws IOException {
+ outputStream.flush();
+ }
+
+ @Override
+ public void finish() throws IOException {
+ outputStream.flush();
+ outputStream.close();
+ }
+ }
+}
diff --git
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
new file mode 100644
index 0000000..1ff8ce2
--- /dev/null
+++
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
+import
org.apache.flink.formats.hadoop.bulk.TestHadoopPathBasedBulkWriterFactory;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.util.FlinkUserCodeClassLoader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests the behaviors of {@link HadoopPathBasedBulkFormatBuilder}.
+ */
+public class HadoopPathBasedBulkFormatBuilderTest {
+
+ /**
+ * Tests if we could create {@link HadoopPathBasedBulkFormatBuilder}
within user classloader.
+ * It is mainly verify we have fixed the issue raised in
https://issues.apache.org/jira/browse/FLINK-19398.
+ */
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void testCreatingBuildWithinUserClassLoader() throws Exception {
+ ClassLoader appClassLoader = getClass().getClassLoader();
+ Assume.assumeTrue(appClassLoader instanceof URLClassLoader);
+
+ ClassLoader userClassLoader = new
SpecifiedChildFirstUserClassLoader(
+
HadoopPathBasedBulkFormatBuilder.class.getName(),
+ appClassLoader,
+ ((URLClassLoader) appClassLoader).getURLs());
+
+ Class<HadoopPathBasedBulkFormatBuilder>
userHadoopFormatBuildClass =
+ (Class<HadoopPathBasedBulkFormatBuilder>)
userClassLoader.loadClass(
+
HadoopPathBasedBulkFormatBuilder.class.getName());
+ Constructor<?> constructor =
userHadoopFormatBuildClass.getConstructor(
+ Path.class,
+ HadoopPathBasedBulkWriter.Factory.class,
+ Configuration.class,
+ BucketAssigner.class);
+ Object hadoopFormatBuilder = constructor.newInstance(
+ new Path("/tmp"),
+ new TestHadoopPathBasedBulkWriterFactory(),
+ new Configuration(),
+ new DateTimeBucketAssigner<>());
+
+ Buckets<String, String> buckets = (Buckets<String, String>)
userHadoopFormatBuildClass
+ .getMethod("createBuckets", int.class)
+ .invoke(hadoopFormatBuilder, 0);
+ assertNotNull(buckets);
+ }
+
+ private static class SpecifiedChildFirstUserClassLoader extends
FlinkUserCodeClassLoader {
+
+ private final String specifiedClassName;
+
+ protected SpecifiedChildFirstUserClassLoader(String
specifiedClassName, ClassLoader parent, URL[] urls) {
+ super(urls, parent);
+ this.specifiedClassName = specifiedClassName;
+ }
+
+ @Override
+ protected Class<?> loadClassWithoutExceptionHandling(String
name, boolean resolve) throws ClassNotFoundException {
+ if (name.equals(specifiedClassName)) {
+ return findClass(name);
+ } else {
+ return
super.loadClassWithoutExceptionHandling(name, resolve);
+ }
+ }
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index 6423627..bc51f21 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
* A factory able to create {@link Bucket buckets} for the {@link
StreamingFileSink}.
*/
@Internal
-interface BucketFactory<IN, BucketID> extends Serializable {
+public interface BucketFactory<IN, BucketID> extends Serializable {
Bucket<IN, BucketID> getNewBucket(
final int subtaskIndex,
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index f936348..eeed2c6 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -91,7 +91,7 @@ public class Buckets<IN, BucketID> {
* @param bucketWriter The {@link BucketWriter} to be used when writing
data.
* @param rollingPolicy The {@link RollingPolicy} as specified by the
user.
*/
- Buckets(
+ public Buckets(
final Path basePath,
final BucketAssigner<IN, BucketID> bucketAssigner,
final BucketFactory<IN, BucketID> bucketFactory,
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
index bb20b97..be992ed 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
@@ -27,7 +27,7 @@ import java.io.IOException;
* A factory returning {@link Bucket buckets}.
*/
@Internal
-class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN,
BucketID> {
+public class DefaultBucketFactoryImpl<IN, BucketID> implements
BucketFactory<IN, BucketID> {
private static final long serialVersionUID = 1L;