This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2dd2b7221a [fs] JindoFileIO supports OSS additional meta information
(#5459)
2dd2b7221a is described below
commit 2dd2b7221a4cdc6388ad37c2497fda895ffc3f12
Author: 张凯旋 <[email protected]>
AuthorDate: Thu Apr 17 16:16:12 2025 +0800
[fs] JindoFileIO supports OSS additional meta information (#5459)
---
.../apache/paimon/jindo/HadoopCompliantFileIO.java | 4 +-
.../java/org/apache/paimon/jindo/JindoFileIO.java | 166 +++++++++++++++++++++
2 files changed, 168 insertions(+), 2 deletions(-)
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
index 030d845b3b..99c80b912a 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
@@ -112,7 +112,7 @@ public abstract class HadoopCompliantFileIO implements
FileIO {
return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
}
- private org.apache.hadoop.fs.Path path(Path path) {
+ protected org.apache.hadoop.fs.Path path(Path path) {
URI uri = path.toUri();
if (uri.getScheme().equals("oss") && uri.getUserInfo() != null) {
path = new Path("oss:/" + uri.getPath());
@@ -120,7 +120,7 @@ public abstract class HadoopCompliantFileIO implements
FileIO {
return new org.apache.hadoop.fs.Path(path.toUri());
}
- private JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path)
throws IOException {
+ protected JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path)
throws IOException {
return getFileSystemPair(path).getKey();
}
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
index 423f6fb7ec..80a03b2e95 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
@@ -20,6 +20,9 @@ package org.apache.paimon.jindo;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Pair;
@@ -27,11 +30,14 @@ import org.apache.paimon.utils.Pair;
import com.aliyun.jindodata.common.JindoHadoopSystem;
import com.aliyun.jindodata.dls.JindoDlsFileSystem;
import com.aliyun.jindodata.oss.JindoOssFileSystem;
+import com.aliyun.jindodata.types.JindoHadoopFileStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
@@ -42,6 +48,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
+import static org.apache.paimon.options.CatalogOptions.FILE_IO_POPULATE_META;
/** Jindo {@link FileIO}. */
public class JindoFileIO extends HadoopCompliantFileIO {
@@ -80,6 +87,7 @@ public class JindoFileIO extends HadoopCompliantFileIO {
private Options hadoopOptions;
private boolean allowCache = true;
+ private boolean populateMeta = false;
@Override
public boolean isObjectStore() {
@@ -89,6 +97,7 @@ public class JindoFileIO extends HadoopCompliantFileIO {
@Override
public void configure(CatalogContext context) {
allowCache = context.options().get(FILE_IO_ALLOW_CACHE);
+ populateMeta = context.options().get(FILE_IO_POPULATE_META);
hadoopOptions = new Options();
//
https://github.com/aliyun/alibabacloud-jindodata/blob/master/docs/user/4.x/4.6.x/4.6.1/oss/hadoop/jindosdk_ide_hadoop.md
hadoopOptions.set("fs.oss.impl",
"com.aliyun.jindodata.oss.JindoOssFileSystem");
@@ -200,4 +209,161 @@ public class JindoFileIO extends HadoopCompliantFileIO {
return Objects.hash(options, scheme, authority);
}
}
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ FileStatus basic = super.getFileStatus(path);
+ if (!populateMeta) {
+ return basic;
+ }
+ JindoHadoopSystem fs = getFileSystem(path(path));
+ return getExtendedFileStatus(fs, basic);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ FileStatus[] basic = super.listStatus(path);
+ if (!populateMeta) {
+ return basic;
+ }
+ JindoHadoopSystem fs = getFileSystem(path(path));
+ FileStatus[] extended = new FileStatus[basic.length];
+ for (int i = 0; i < basic.length; i++) {
+ extended[i] = getExtendedFileStatus(fs, basic[i]);
+ }
+ return extended;
+ }
+
+ @Override
+ public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean
recursive)
+ throws IOException {
+ RemoteIterator<FileStatus> basicIter = super.listFilesIterative(path,
recursive);
+ if (!populateMeta) {
+ return basicIter;
+ }
+ JindoHadoopSystem fs = getFileSystem(path(path));
+ return new RemoteIterator<FileStatus>() {
+ @Override
+ public boolean hasNext() throws IOException {
+ return basicIter.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ FileStatus basic = basicIter.next();
+ return getExtendedFileStatus(fs, basic);
+ }
+
+ @Override
+ public void close() throws IOException {
+ basicIter.close();
+ }
+ };
+ }
+
+ private ExtendedFileStatus getExtendedFileStatus(JindoHadoopSystem fs,
FileStatus status)
+ throws IOException {
+ org.apache.hadoop.fs.Path path = path(status.getPath());
+ if (!path.isAbsolute()) {
+ path = new org.apache.hadoop.fs.Path(fs.getWorkingDirectory(),
path);
+ }
+ JindoHadoopFileStatus meta = fs.getJindoFileStatus(path);
+ return new ExtendedFileStatus(status, meta);
+ }
+
+ private static class ExtendedFileStatus implements FileStatus {
+
+ private final FileStatus basic;
+ @Nullable private final
com.aliyun.jindodata.types.JindoHadoopFileStatus meta;
+
+ private ExtendedFileStatus(
+ FileStatus basic, @Nullable
com.aliyun.jindodata.types.JindoHadoopFileStatus meta) {
+ this.basic = basic;
+ this.meta = meta;
+ }
+
+ @Override
+ public long getLen() {
+ return basic.getLen();
+ }
+
+ @Override
+ public boolean isDir() {
+ return basic.isDir();
+ }
+
+ @Override
+ public Path getPath() {
+ return basic.getPath();
+ }
+
+ @Override
+ public long getModificationTime() {
+ return basic.getModificationTime();
+ }
+
+ @Override
+ public long getAccessTime() {
+ if (meta == null) {
+ return basic.getAccessTime();
+ }
+ return meta.getAccessTime();
+ }
+
+ @Nullable
+ @Override
+ public String getOwner() {
+ if (meta == null) {
+ return basic.getOwner();
+ }
+ return meta.getOwner();
+ }
+
+ @Nullable
+ @Override
+ public Integer getGeneration() {
+ return basic.getGeneration();
+ }
+
+ @Nullable
+ @Override
+ public String getContentType() {
+ return basic.getContentType();
+ }
+
+ @Nullable
+ @Override
+ public String getStorageClass() {
+ if (meta == null) {
+ return basic.getStorageClass();
+ }
+ return meta.getStorageClassName();
+ }
+
+ @Nullable
+ @Override
+ public String getMd5Hash() {
+ return basic.getMd5Hash();
+ }
+
+ @Nullable
+ @Override
+ public Long getMetadataModificationTime() {
+ if (meta == null) {
+ return basic.getMetadataModificationTime();
+ }
+ return meta.getModificationTime();
+ }
+
+ @Nullable
+ @Override
+ public Map<String, String> getMetadata() {
+ if (meta == null) {
+ return basic.getMetadata();
+ }
+ HashMap<String, String> metaData = new HashMap<>();
+ meta.getXAttrs().forEach((k, v) -> metaData.put(k, new String(v)));
+ return metaData;
+ }
+ }
}