This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 064235decce [HUDI-7796] Gracefully cast file system instance in Avro 
writers (#11304)
064235decce is described below

commit 064235decced2fb9d04c5e2e85a6472469896dab
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat May 25 20:21:19 2024 -0700

    [HUDI-7796] Gracefully cast file system instance in Avro writers (#11304)
---
 .../org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java     | 12 ++++++++----
 .../java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java  | 11 ++++++++---
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
index d3d66b5c978..c23cb438310 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
@@ -35,6 +35,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
@@ -68,7 +69,8 @@ public class HoodieAvroHFileWriter
   private static AtomicLong recordIndex = new AtomicLong(1);
   private final Path file;
   private HoodieHFileConfig hfileConfig;
-  private final HoodieWrapperFileSystem fs;
+  private final boolean isWrapperFileSystem;
+  private final Option<HoodieWrapperFileSystem> wrapperFs;
   private final long maxFileSize;
   private final String instantTime;
   private final TaskContextSupplier taskContextSupplier;
@@ -88,7 +90,9 @@ public class HoodieAvroHFileWriter
 
     Configuration conf = HadoopFSUtils.registerFileSystem(file, 
hfileConfig.getHadoopConf());
     this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
-    this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
+    FileSystem fs = this.file.getFileSystem(conf);
+    this.isWrapperFileSystem = fs instanceof HoodieWrapperFileSystem;
+    this.wrapperFs = this.isWrapperFileSystem ? 
Option.of((HoodieWrapperFileSystem) fs) : Option.empty();
     this.hfileConfig = hfileConfig;
     this.schema = schema;
     this.keyFieldSchema = 
Option.ofNullable(schema.getField(hfileConfig.getKeyFieldName()));
@@ -114,7 +118,7 @@ public class HoodieAvroHFileWriter
         String.valueOf(hfileConfig.shouldDropBehindCacheCompaction()));
     CacheConfig cacheConfig = new CacheConfig(conf);
     this.writer = HFile.getWriterFactory(conf, cacheConfig)
-        .withPath(this.fs, this.file)
+        .withPath(fs, this.file)
         .withFileContext(context)
         .create();
 
@@ -136,7 +140,7 @@ public class HoodieAvroHFileWriter
 
   @Override
   public boolean canWrite() {
-    return fs.getBytesWritten(file) < maxFileSize;
+    return !isWrapperFileSystem || wrapperFs.get().getBytesWritten(file) < 
maxFileSize;
   }
 
   @Override
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
index 3ecc8fcd450..0516caad9ee 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
@@ -25,6 +25,7 @@ import 
org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.util.AvroOrcUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.io.storage.HoodieAvroFileWriter;
@@ -35,6 +36,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -61,7 +63,8 @@ public class HoodieAvroOrcWriter implements 
HoodieAvroFileWriter, Closeable {
   private final Writer writer;
 
   private final Path file;
-  private final HoodieWrapperFileSystem fs;
+  private final boolean isWrapperFileSystem;
+  private final Option<HoodieWrapperFileSystem> wrapperFs;
   private final String instantTime;
   private final TaskContextSupplier taskContextSupplier;
 
@@ -74,7 +77,9 @@ public class HoodieAvroOrcWriter implements 
HoodieAvroFileWriter, Closeable {
 
     Configuration conf = HadoopFSUtils.registerFileSystem(file, 
config.getStorageConf().unwrapAs(Configuration.class));
     this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
-    this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
+    FileSystem fs = this.file.getFileSystem(conf);
+    this.isWrapperFileSystem = fs instanceof HoodieWrapperFileSystem;
+    this.wrapperFs = this.isWrapperFileSystem ? 
Option.of((HoodieWrapperFileSystem) fs) : Option.empty();
     this.instantTime = instantTime;
     this.taskContextSupplier = taskContextSupplier;
 
@@ -104,7 +109,7 @@ public class HoodieAvroOrcWriter implements 
HoodieAvroFileWriter, Closeable {
 
   @Override
   public boolean canWrite() {
-    return fs.getBytesWritten(file) < maxFileSize;
+    return !isWrapperFileSystem || wrapperFs.get().getBytesWritten(file) < 
maxFileSize;
   }
 
   @Override

Reply via email to