This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8846901229 [lance] Add support for HadoopFileIO (#5887)
8846901229 is described below
commit 884690122932ffd77f36819c7e357670ba655c37
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Jul 14 22:50:34 2025 +0800
[lance] Add support for HadoopFileIO (#5887)
---
.../main/java/org/apache/paimon/options/Options.java | 5 +++++
.../org/apache/paimon/fs/hadoop/HadoopFileIO.java | 5 +++++
.../org/apache/paimon/format/lance/LanceUtils.java | 20 +++++++++++++++++---
3 files changed, 27 insertions(+), 3 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/options/Options.java
b/paimon-api/src/main/java/org/apache/paimon/options/Options.java
index 46e70e22af..178adc7545 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/Options.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/Options.java
@@ -69,6 +69,11 @@ public class Options implements Serializable {
map2.forEach(this::setString);
}
+ public Options(Iterable<Map.Entry<String, String>> map) {
+ this();
+ map.forEach(entry -> setString(entry.getKey(), entry.getValue()));
+ }
+
public static Options fromMap(Map<String, String> map) {
return new Options(map);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
index 8ecea5ed3e..5c5acb7259 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
@@ -31,6 +31,7 @@ import org.apache.paimon.utils.FunctionWithException;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -71,6 +72,10 @@ public class HadoopFileIO implements FileIO {
this.hadoopConf = new SerializableConfiguration(context.hadoopConf());
}
+ public Configuration hadoopConf() {
+ return hadoopConf.get();
+ }
+
@Override
public SeekableInputStream newInputStream(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
diff --git
a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java
b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java
index c102345829..10b181792c 100644
--- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java
+++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format.lance;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PluginFileIO;
+import org.apache.paimon.fs.hadoop.HadoopFileIO;
import org.apache.paimon.jindo.JindoFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.oss.OSSFileIO;
@@ -36,8 +37,9 @@ import java.util.Map;
public class LanceUtils {
private static final Class<?> ossFileIOKlass;
- private static final Class<?> pluginFileIO;
+ private static final Class<?> pluginFileIOKlass;
private static final Class<?> jindoFileIOKlass;
+ private static final Class<?> hadoopFileIOKlass;
static {
Class<?> klass;
@@ -60,7 +62,14 @@ public class LanceUtils {
} catch (ClassNotFoundException | NoClassDefFoundError e) {
klass = null;
}
- pluginFileIO = klass;
+ pluginFileIOKlass = klass;
+
+ try {
+ klass = Class.forName("org.apache.paimon.fs.hadoop.HadoopFileIO");
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ klass = null;
+ }
+ hadoopFileIOKlass = klass;
}
public static Pair<Path, Map<String, String>> toLanceSpecified(FileIO
fileIO, Path path) {
@@ -81,8 +90,10 @@ public class LanceUtils {
originOptions = ((OSSFileIO) fileIO).hadoopOptions();
} else if (jindoFileIOKlass != null &&
jindoFileIOKlass.isInstance(fileIO)) {
originOptions = ((JindoFileIO) fileIO).hadoopOptions();
- } else if (pluginFileIO != null && pluginFileIO.isInstance(fileIO)) {
+ } else if (pluginFileIOKlass != null &&
pluginFileIOKlass.isInstance(fileIO)) {
originOptions = ((PluginFileIO) fileIO).options();
+ } else if (hadoopFileIOKlass != null &&
hadoopFileIOKlass.isInstance(fileIO)) {
+ originOptions = new Options(((HadoopFileIO) fileIO).hadoopConf());
} else {
originOptions = new Options();
}
@@ -90,6 +101,9 @@ public class LanceUtils {
Path converted = path;
Map<String, String> storageOptions = new HashMap<>();
if ("oss".equals(schema)) {
+ assert originOptions.containsKey("fs.oss.endpoint");
+ assert originOptions.containsKey("fs.oss.accessKeyId");
+ assert originOptions.containsKey("fs.oss.accessKeySecret");
storageOptions.put(
"endpoint",
"https://" + uri.getHost() + "." +
originOptions.get("fs.oss.endpoint"));