[ 
https://issues.apache.org/jira/browse/FLINK-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714549#comment-16714549
 ] 

ASF GitHub Bot commented on FLINK-10865:
----------------------------------------

asfgit closed pull request #7123: [FLINK-10865] Add Aliyun OSS file systems 
without Hadoop dependencies
URL: https://github.com/apache/flink/pull/7123
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/ops/deployment/oss.md b/docs/ops/deployment/oss.md
new file mode 100644
index 00000000000..543d5a2f176
--- /dev/null
+++ b/docs/ops/deployment/oss.md
@@ -0,0 +1,233 @@
+---
+title: "Aliyun Open Storage Service (OSS)"
+nav-title: Aliyun OSS
+nav-parent_id: deployment
+nav-pos: 9
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* ToC
+{:toc}
+
+
+## OSS: Open Storage Service
+
+[Aliyun Open Storage Service](https://www.aliyun.com/product/oss) (Aliyun OSS) 
is widely used especially among China’s cloud users, and it provides cloud 
object storage for a variety of use cases.
+
+[Hadoop file 
system](http://hadoop.apache.org/docs/current/hadoop-aliyun/tools/hadoop-aliyun/index.html)
 supports OSS since version 2.9.1. Now, you can also use OSS with Fink for 
**reading** and **writing data**.
+
+You can access OSS objects like this:
+
+{% highlight plain %}
+oss://<your-bucket>/<object-name>
+{% endhighlight %}
+
+Below shows how to use OSS with Flink:
+
+{% highlight java %}
+// Read from OSS bucket
+env.readTextFile("oss://<your-bucket>/<object-name>");
+
+// Write to OSS bucket
+dataSet.writeAsText("oss://<your-bucket>/<object-name>")
+
+{% endhighlight %}
+
+There are two ways to use OSS with Flink, our shaded `flink-oss-fs-hadoop` 
will cover most scenarios. However, you may need to set up a specific Hadoop 
OSS FileSystem implementation if you want use OSS as YARN's resource storage 
dir ([This patch](https://issues.apache.org/jira/browse/HADOOP-15919) enables 
YARN to use OSS). Both ways are described below.
+
+### Shaded Hadoop OSS file system (recommended)
+
+In order to use `flink-oss-fs-hadoop`, copy the respective JAR file from the 
opt directory to the lib directory of your Flink distribution before starting 
Flink, e.g.
+
+{% highlight bash %}
+cp ./opt/flink-oss-fs-hadoop-{{ site.version }}.jar ./lib/
+{% endhighlight %}
+
+`flink-oss-fs-hadoop` registers default FileSystem wrappers for URIs with the 
oss:// scheme.
+
+#### Configurations setup
+After setting up the OSS FileSystem wrapper, you need to add some 
configurations to make sure that Flink is allowed to access your OSS buckets.
+
+In order to use OSS with Flink more easily, you can use the same configuration 
keys in `flink-conf.yaml` as in Hadoop's `core-site.xml`
+
+You can see the configuration keys in the [Hadoop OSS 
documentation](http://hadoop.apache.org/docs/current/hadoop-aliyun/tools/hadoop-aliyun/index.html).
+
+There are some required configurations that must be added to `flink-conf.yaml` 
(**Other configurations defined in Hadoop OSS documentation are advanced 
configurations which used by performance tuning**):
+
+{% highlight yaml %}
+fs.oss.endpoint: Aliyun OSS endpoint to connect to
+fs.oss.accessKeyId: Aliyun access key ID
+fs.oss.accessKeySecret: Aliyun access key secret
+{% endhighlight %}
+
+### Hadoop-provided OSS file system - manual setup
+This setup is a bit more complex and we recommend using our shaded Hadoop file 
systems instead (see above) unless required otherwise, e.g. for using OSS as 
YARN’s resource storage dir via the fs.defaultFS configuration property in 
Hadoop’s core-site.xml.
+
+#### Set OSS FileSystem
+You need to point Flink to a valid Hadoop configuration, which contains the 
following properties in core-site.xml:
+
+{% highlight xml %}
+<configuration>
+
+<property>
+    <name>fs.oss.impl</name>
+    <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
+  </property>
+
+  <property>
+    <name>fs.oss.endpoint</name>
+    <value>Aliyun OSS endpoint to connect to</value>
+    <description>Aliyun OSS endpoint to connect to. An up-to-date list is 
provided in the Aliyun OSS Documentation.</description>
+  </property>
+
+  <property>
+    <name>fs.oss.accessKeyId</name>
+    <description>Aliyun access key ID</description>
+  </property>
+
+  <property>
+    <name>fs.oss.accessKeySecret</name>
+    <description>Aliyun access key secret</description>
+  </property>
+
+  <property>
+    <name>fs.oss.buffer.dir</name>
+    <value>/tmp/oss</value>
+  </property>
+
+</property>
+
+</configuration>
+{% endhighlight %}
+
+#### Hadoop Configuration
+
+You can specify the [Hadoop configuration](../config.html#hdfs) in various 
ways pointing Flink to
+the path of the Hadoop configuration directory, for example
+- by setting the environment variable `HADOOP_CONF_DIR`, or
+- by setting the `fs.hdfs.hadoopconf` configuration option in 
`flink-conf.yaml`:
+{% highlight yaml %}
+fs.hdfs.hadoopconf: /path/to/etc/hadoop
+{% endhighlight %}
+
+This registers `/path/to/etc/hadoop` as Hadoop's configuration directory with 
Flink. Flink will look for the `core-site.xml` and `hdfs-site.xml` files in the 
specified directory.
+
+#### Provide OSS FileSystem Dependency
+
+You can find Hadoop OSS FileSystem are packaged in the hadoop-aliyun artifact. 
This JAR and all its dependencies need to be added to Flink’s classpath, i.e. 
the class path of both Job and TaskManagers.
+
+There are multiple ways of adding JARs to Flink’s class path, the easiest 
being simply to drop the JARs in Flink’s lib folder. You need to copy the 
hadoop-aliyun JAR with all its dependencies (You can find these as part of the 
Hadoop binaries in hadoop-3/share/hadoop/tools/lib). You can also export the 
directory containing these JARs as part of the HADOOP_CLASSPATH environment 
variable on all machines.
+
+## An Example
+Below is an example shows the result of our setup (data is generated by TPC-DS 
tool)
+
+{% highlight java %}
+// Read from OSS bucket
+scala> val dataSet = 
benv.readTextFile("oss://<your-bucket>/50/call_center/data-m-00049")
+dataSet: org.apache.flink.api.scala.DataSet[String] = 
org.apache.flink.api.scala.DataSet@31940704
+
+scala> dataSet.print()
+1|AAAAAAAABAAAAAAA|1998-01-01|||2450952|NY 
Metro|large|2935|1670015|8AM-4PM|Bob Belcher|6|More than other authori|Shared 
others could not count fully dollars. New members ca|Julius 
Tran|3|pri|6|cally|730|Ash Hill|Boulevard|Suite 0|Oak Grove|Williamson 
County|TN|38370|United States|-5|0.11|
+2|AAAAAAAACAAAAAAA|1998-01-01|2000-12-31||2450806|Mid 
Atlantic|medium|1574|594972|8AM-8AM|Felipe Perkins|2|A bit narrow forms matter 
animals. Consist|Largely blank years put substantially deaf, new others. 
Question|Julius Durham|5|anti|1|ought|984|Center Hill|Way|Suite 
70|Midway|Williamson County|TN|31904|United States|-5|0.12|
+3|AAAAAAAACAAAAAAA|2001-01-01|||2450806|Mid 
Atlantic|medium|1574|1084486|8AM-4PM|Mark Hightower|2|Wrong troops shall work 
sometimes in a opti|Largely blank years put substantially deaf, new others. 
Question|Julius Durham|1|ought|2|able|984|Center Hill|Way|Suite 
70|Midway|Williamson County|TN|31904|United States|-5|0.01|
+4|AAAAAAAAEAAAAAAA|1998-01-01|2000-01-01||2451063|North 
Midwest|medium|10137|6578913|8AM-4PM|Larry Mccray|2|Dealers make most 
historical, direct students|Rich groups catch longer other fears; 
future,|Matthew Clifton|4|ese|3|pri|463|Pine Ridge|RD|Suite U|Five 
Points|Ziebach County|SD|56098|United States|-6|0.05|
+5|AAAAAAAAEAAAAAAA|2000-01-02|2001-12-31||2451063|North 
Midwest|small|17398|4610470|8AM-8AM|Larry Mccray|2|Dealers make most 
historical, direct students|Blue, due beds come. Politicians would not make far 
thoughts. Specifically new horses partic|Gary Colburn|4|ese|3|pri|463|Pine 
Ridge|RD|Suite U|Five Points|Ziebach County|SD|56098|United States|-6|0.12|
+6|AAAAAAAAEAAAAAAA|2002-01-01|||2451063|North 
Midwest|medium|13118|6585236|8AM-4PM|Larry Mccray|5|Silly particles could 
pro|Blue, due beds come. Politicians would not make far thoughts. Specifically 
new horses partic|Gary Colburn|5|anti|3|pri|463|Pine Ridge|RD|Suite U|Five 
Points|Ziebach County|SD|56098|United States|-6|0.11|
+7|AAAAAAAAHAAAAAAA|1998-01-01|||2451024|Pacific 
Northwest|small|6280|1739560|8AM-4PM|Alden Snyder|6|Major, formal states can 
suppor|Reduced, subsequent bases could not lik|Frederick 
Weaver|5|anti|4|ese|415|Jefferson Tenth|Court|Suite 180|Riverside|Walker 
County|AL|39231|United States|-6|0.00|
+8|AAAAAAAAIAAAAAAA|1998-01-01|2000-12-31||2450808|California|small|4766|2459256|8AM-12AM|Wayne
 Ray|6|Here possible notions arrive only. Ar|Common, free creditors should 
exper|Daniel Weller|5|anti|2|able|550|Cedar Elm|Ct.|Suite I|Fairview|Williamson 
County|TN|35709|United States|-5|0.06|
+
+scala> dataSet.count()
+res0: Long = 8
+
+// Write to OSS bucket
+scala> dataSet.writeAsText("oss://<your-bucket>/50/call_center/data-m-00049.1")
+
+scala> benv.execute("My batch program")
+res1: org.apache.flink.api.common.JobExecutionResult = 
org.apache.flink.api.common.JobExecutionResult@77476fcf
+
+scala> val newDataSet = 
benv.readTextFile("oss://<your-bucket>/50/call_center/data-m-00049.1")
+newDataSet: org.apache.flink.api.scala.DataSet[String] = 
org.apache.flink.api.scala.DataSet@40b70f31
+
+scala> newDataSet.count()
+res2: Long = 8
+
+{% endhighlight %}
+
+## Common Issues
+### Could not find OSS file system
+If your job submission fails with an Exception message like below, please 
check if our shaded jar (flink-oss-fs-hadoop-{{ site.version }}.jar) is in the 
lib dir.
+
+{% highlight plain %}
+Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
set up JobManager
+       at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
+       at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
+       at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
+       at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
+       ... 7 more
+Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Could not find a file system implementation for scheme 'oss'. 
The scheme is not directly supported by Flink and no Hadoop file system to 
support this scheme could be loaded.
+       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:273)
+       at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:827)
+       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232)
+       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
+       at 
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
+       at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
+       at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
+       at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
+       ... 10 more
+Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Could not find a file system implementation for scheme 'oss'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.
+       at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
+       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
+       at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
+       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
+       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
+       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:259)
+       ... 17 more
+Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
+       at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
+       at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
+       ... 22 more
+{% endhighlight %}
+
+### Missing configuration(s)
+If your job submission fails with an Exception message like below, please 
check if the corresponding configurations exits in `flink-conf.yaml`
+
+{% highlight plain %}
+Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Aliyun OSS endpoint should not be null or empty. Please set 
proper endpoint with 'fs.oss.endpoint'.
+       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:273)
+       at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:827)
+       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232)
+       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
+       at 
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
+       at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
+       at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
+       at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
+       ... 10 more
+Caused by: java.lang.IllegalArgumentException: Aliyun OSS endpoint should not 
be null or empty. Please set proper endpoint with 'fs.oss.endpoint'.
+       at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.initialize(AliyunOSSFileSystemStore.java:145)
+       at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:323)
+       at 
org.apache.flink.fs.osshadoop.OSSFileSystemFactory.create(OSSFileSystemFactory.java:87)
+       at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
+       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
+       at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
+       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
+       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
+       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:259)
+       ... 17 more
+{% endhighlight %}
diff --git a/flink-dist/src/main/assemblies/opt.xml 
b/flink-dist/src/main/assemblies/opt.xml
index 0d9acf3c3ae..aa18ef31278 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -140,6 +140,13 @@
                        <fileMode>0644</fileMode>
                </file>
 
+               <file>
+                       
<source>../flink-filesystems/flink-oss-fs-hadoop/target/flink-oss-fs-hadoop-${project.version}.jar</source>
+                       <outputDirectory>opt/</outputDirectory>
+                       
<destName>flink-oss-fs-hadoop-${project.version}.jar</destName>
+                       <fileMode>0644</fileMode>
+               </file>
+
                <!-- Queryable State -->
                <file>
                        
<source>../flink-queryable-state/flink-queryable-state-runtime/target/flink-queryable-state-runtime_${scala.binary.version}-${project.version}.jar</source>
diff --git a/flink-filesystems/flink-oss-fs-hadoop/pom.xml 
b/flink-filesystems/flink-oss-fs-hadoop/pom.xml
new file mode 100644
index 00000000000..b4f70a49e45
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/pom.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>flink-filesystems</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.8-SNAPSHOT</version>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-oss-fs-hadoop</artifactId>
+
+       <properties>
+               <fs.oss.sdk.version>3.1.0</fs.oss.sdk.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-hadoop-fs</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-fs-hadoop-shaded</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-aliyun</artifactId>
+                       <version>${fs.hadoopshaded.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>com.aliyun.oss</groupId>
+                                       <artifactId>aliyun-oss-sdk</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.aliyun.oss</groupId>
+                       <artifactId>aliyun-sdk-oss</artifactId>
+                       <version>${fs.oss.sdk.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-fs-hadoop-shaded</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+
+                       <!-- this is merely an intermediate build artifact and 
should not be -->
+                       <!-- deployed to maven central                          
             -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-deploy-plugin</artifactId>
+                               <configuration>
+                                       <skip>true</skip>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Relocate all OSS related classes -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<shadeTestJar>false</shadeTestJar>
+                                                       <artifactSet>
+                                                               <includes>
+                                                                       
<include>*:*</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <relocations>
+                                                               <relocation>
+                                                                       
<pattern>org.apache.hadoop</pattern>
+                                                                       
<shadedPattern>org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop</shadedPattern>
+                                                               </relocation>
+                                                               <!-- relocate 
the OSS dependencies -->
+                                                               <relocation>
+                                                                       
<pattern>com.aliyun</pattern>
+                                                                       
<shadedPattern>org.apache.flink.fs.osshadoop.shaded.com.aliyun</shadedPattern>
+                                                               </relocation>
+                                                               <relocation>
+                                                                       
<pattern>com.aliyuncs</pattern>
+                                                                       
<shadedPattern>org.apache.flink.fs.osshadoop.shaded.com.aliyuncs</shadedPattern>
+                                                               </relocation>
+                                                       </relocations>
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>.gitkeep</exclude>
+                                                                               
<exclude>mime.types</exclude>
+                                                                               
<exclude>mozilla/**</exclude>
+                                                                               
<exclude>META-INF/maven/**</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>
diff --git 
a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSFileSystemFactory.java
 
b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSFileSystemFactory.java
new file mode 100644
index 00000000000..52335f5891f
--- /dev/null
+++ 
b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSFileSystemFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Simple factory for the OSS file system.
+ */
+public class OSSFileSystemFactory implements FileSystemFactory {
+       private static final Logger LOG = 
LoggerFactory.getLogger(OSSFileSystemFactory.class);
+
+       private Configuration flinkConfig;
+
+       private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+       private static final Set<String> CONFIG_KEYS_TO_SHADE = 
Collections.singleton("fs.oss.credentials.provider");
+
+       private static final String FLINK_SHADING_PREFIX = 
"org.apache.flink.fs.shaded.hadoop3.";
+
+       /**
+        * In order to simplify, we make flink oss configuration keys same with 
hadoop oss module.
+        * So, we add all configuration key with prefix `fs.oss` in flink conf 
to hadoop conf
+        */
+       private static final String[] FLINK_CONFIG_PREFIXES = { "fs.oss."};
+
+       @Override
+       public String getScheme() {
+               return "oss";
+       }
+
+       @Override
+       public void configure(Configuration config) {
+               flinkConfig = config;
+               hadoopConfig = null;
+       }
+
+       @Override
+       public FileSystem create(URI fsUri) throws IOException {
+               this.hadoopConfig = getHadoopConfiguration();
+
+               final String scheme = fsUri.getScheme();
+               final String authority = fsUri.getAuthority();
+
+               if (scheme == null && authority == null) {
+                       fsUri = 
org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+               } else if (scheme != null && authority == null) {
+                       URI defaultUri = 
org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+                       if (scheme.equals(defaultUri.getScheme()) && 
defaultUri.getAuthority() != null) {
+                               fsUri = defaultUri;
+                       }
+               }
+
+               final AliyunOSSFileSystem fs = new AliyunOSSFileSystem();
+               fs.initialize(fsUri, hadoopConfig);
+               return new HadoopFileSystem(fs);
+       }
+
+       @VisibleForTesting
+       org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+               org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
+               if (flinkConfig == null) {
+                       return conf;
+               }
+
+               // read all configuration with prefix 'FLINK_CONFIG_PREFIXES'
+               for (String key : flinkConfig.keySet()) {
+                       for (String prefix : FLINK_CONFIG_PREFIXES) {
+                               if (key.startsWith(prefix)) {
+                                       String value = 
flinkConfig.getString(key, null);
+                                       conf.set(key, value);
+                                       if (CONFIG_KEYS_TO_SHADE.contains(key)) 
{
+                                               conf.set(key, 
FLINK_SHADING_PREFIX + value);
+                                       }
+
+                                       LOG.debug("Adding Flink config entry 
for {} as {} to Hadoop config", key, conf.get(key));
+                               }
+                       }
+               }
+               return conf;
+       }
+}
diff --git 
a/flink-filesystems/flink-oss-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
 
b/flink-filesystems/flink-oss-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 00000000000..4810f8a643b
--- /dev/null
+++ 
b/flink-filesystems/flink-oss-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.fs.osshadoop.OSSFileSystemFactory
diff --git 
a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java
 
b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java
new file mode 100644
index 00000000000..33f96b85333
--- /dev/null
+++ 
b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertTrue;
+
+/**
+ * Unit tests for the OSS file system support via AliyunOSSFileSystem.
+ * These tests do actually read from or write to OSS.
+ */
+public class HadoopOSSFileSystemITCase extends TestLogger {
+
+       private static final String ENDPOINT = 
System.getenv("ARTIFACTS_OSS_ENDPOINT");
+       private static final String BUCKET = 
System.getenv("ARTIFACTS_OSS_BUCKET");
+       private static final String TEST_DATA_DIR = "tests-" + 
UUID.randomUUID();
+       private static final String ACCESS_KEY = 
System.getenv("ARTIFACTS_OSS_ACCESS_KEY");
+       private static final String SECRET_KEY = 
System.getenv("ARTIFACTS_OSS_SECRET_KEY");
+
+       @BeforeClass
+       public static void checkIfCredentialsArePresent() {
+               Assume.assumeTrue("Aliyun OSS endpoint not configured, skipping 
test...", ENDPOINT != null);
+               Assume.assumeTrue("Aliyun OSS bucket not configured, skipping 
test...", BUCKET != null);
+               Assume.assumeTrue("Aliyun OSS access key not configured, 
skipping test...", ACCESS_KEY != null);
+               Assume.assumeTrue("Aliyun OSS secret key not configured, 
skipping test...", SECRET_KEY != null);
+       }
+
+       @Test
+       public void testReadAndWrite() throws Exception {
+               final Configuration conf = new Configuration();
+               conf.setString("fs.oss.endpoint", ENDPOINT);
+               conf.setString("fs.oss.accessKeyId", ACCESS_KEY);
+               conf.setString("fs.oss.accessKeySecret", SECRET_KEY);
+               final String testLine = "Aliyun OSS";
+
+               FileSystem.initialize(conf);
+               final Path path = new Path("oss://" + BUCKET + '/' + 
TEST_DATA_DIR);
+               final FileSystem fs = path.getFileSystem();
+               try {
+                       for (int i = 0; i < 10; ++i) {
+                               final Path file = new Path(path.getPath() + 
"/test.data." + i);
+                               try (FSDataOutputStream out = fs.create(file, 
FileSystem.WriteMode.OVERWRITE)) {
+                                       try (OutputStreamWriter writer = new 
OutputStreamWriter(out, StandardCharsets.UTF_8)) {
+                                               writer.write(testLine);
+                                       }
+                               }
+                               try (FSDataInputStream in = fs.open(file);
+                                       InputStreamReader ir = new 
InputStreamReader(in, StandardCharsets.UTF_8);
+                                       BufferedReader reader = new 
BufferedReader(ir)) {
+                                       String line = reader.readLine();
+                                       assertEquals(testLine, line);
+                               }
+                       }
+                       assertTrue(fs.exists(path));
+                       assertEquals(10, fs.listStatus(path).length);
+               } finally {
+                       fs.delete(path, true);
+               }
+       }
+
+       @Test
+       public void testShadedConfigurations() {
+               final Configuration conf = new Configuration();
+               conf.setString("fs.oss.endpoint", ENDPOINT);
+               conf.setString("fs.oss.accessKeyId", ACCESS_KEY);
+               conf.setString("fs.oss.accessKeySecret", SECRET_KEY);
+               conf.setString("fs.oss.credentials.provider", 
"org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider");
+
+               OSSFileSystemFactory ossfsFactory = new OSSFileSystemFactory();
+               ossfsFactory.configure(conf);
+               org.apache.hadoop.conf.Configuration configuration = 
ossfsFactory.getHadoopConfiguration();
+               // shaded
+               
assertEquals("org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider",
+                       configuration.get("fs.oss.credentials.provider"));
+               // should not shaded
+               assertEquals(ENDPOINT, configuration.get("fs.oss.endpoint"));
+               assertEquals(ACCESS_KEY, 
configuration.get("fs.oss.accessKeyId"));
+               assertEquals(SECRET_KEY, 
configuration.get("fs.oss.accessKeySecret"));
+       }
+}
diff --git a/flink-filesystems/pom.xml b/flink-filesystems/pom.xml
index b4742f53e96..cde662cd987 100644
--- a/flink-filesystems/pom.xml
+++ b/flink-filesystems/pom.xml
@@ -46,6 +46,7 @@ under the License.
                <module>flink-s3-fs-hadoop</module>
                <module>flink-s3-fs-presto</module>
                <module>flink-swift-fs-hadoop</module>
+               <module>flink-oss-fs-hadoop</module>
        </modules>
 
        <!-- Common dependency setup for all filesystems -->


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement Flink's own Aliyun OSS filesystem
> -------------------------------------------
>
>                 Key: FLINK-10865
>                 URL: https://issues.apache.org/jira/browse/FLINK-10865
>             Project: Flink
>          Issue Type: New Feature
>          Components: filesystem-connector
>    Affects Versions: 1.6.2
>            Reporter: wujinhu
>            Assignee: wujinhu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> Aliyun OSS is widely used among China’s cloud users, and Hadoop supports 
> Aliyun OSS since 2.9.1. 
> Open this jira to wrap AliyunOSSFileSystem in flink(similar to s3 support), 
> so that user can read from & write to OSS more easily in flink. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to