This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.15.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b9ffa976c0ad8653d4bd9546a9853e5ea8347f85 Author: Y Ethan Guo <[email protected]> AuthorDate: Sat May 25 20:21:19 2024 -0700 [HUDI-7796] Gracefully cast file system instance in Avro writers (#11304) --- .../org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java | 12 ++++++++---- .../java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java | 11 ++++++++--- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java index d3d66b5c978..c23cb438310 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java @@ -35,6 +35,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -68,7 +69,8 @@ public class HoodieAvroHFileWriter private static AtomicLong recordIndex = new AtomicLong(1); private final Path file; private HoodieHFileConfig hfileConfig; - private final HoodieWrapperFileSystem fs; + private final boolean isWrapperFileSystem; + private final Option<HoodieWrapperFileSystem> wrapperFs; private final long maxFileSize; private final String instantTime; private final TaskContextSupplier taskContextSupplier; @@ -88,7 +90,9 @@ public class HoodieAvroHFileWriter Configuration conf = HadoopFSUtils.registerFileSystem(file, hfileConfig.getHadoopConf()); this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf); - this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf); + FileSystem fs = this.file.getFileSystem(conf); + this.isWrapperFileSystem = fs instanceof HoodieWrapperFileSystem; + this.wrapperFs = this.isWrapperFileSystem ? Option.of((HoodieWrapperFileSystem) fs) : Option.empty(); this.hfileConfig = hfileConfig; this.schema = schema; this.keyFieldSchema = Option.ofNullable(schema.getField(hfileConfig.getKeyFieldName())); @@ -114,7 +118,7 @@ public class HoodieAvroHFileWriter String.valueOf(hfileConfig.shouldDropBehindCacheCompaction())); CacheConfig cacheConfig = new CacheConfig(conf); this.writer = HFile.getWriterFactory(conf, cacheConfig) - .withPath(this.fs, this.file) + .withPath(fs, this.file) .withFileContext(context) .create(); @@ -136,7 +140,7 @@ public class HoodieAvroHFileWriter @Override public boolean canWrite() { - return fs.getBytesWritten(file) < maxFileSize; + return !isWrapperFileSystem || wrapperFs.get().getBytesWritten(file) < maxFileSize; } @Override diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java index 3ecc8fcd450..0516caad9ee 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.AvroOrcUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; import org.apache.hudi.io.storage.HoodieAvroFileWriter; @@ -35,6 +36,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -61,7 +63,8 @@ public class HoodieAvroOrcWriter implements HoodieAvroFileWriter, Closeable { private final Writer writer; private final Path file; - private final HoodieWrapperFileSystem fs; + private final boolean isWrapperFileSystem; + private final Option<HoodieWrapperFileSystem> wrapperFs; private final String instantTime; private final TaskContextSupplier taskContextSupplier; @@ -74,7 +77,9 @@ public class HoodieAvroOrcWriter implements HoodieAvroFileWriter, Closeable { Configuration conf = HadoopFSUtils.registerFileSystem(file, config.getStorageConf().unwrapAs(Configuration.class)); this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf); - this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf); + FileSystem fs = this.file.getFileSystem(conf); + this.isWrapperFileSystem = fs instanceof HoodieWrapperFileSystem; + this.wrapperFs = this.isWrapperFileSystem ? Option.of((HoodieWrapperFileSystem) fs) : Option.empty(); this.instantTime = instantTime; this.taskContextSupplier = taskContextSupplier; @@ -104,7 +109,7 @@ public class HoodieAvroOrcWriter implements HoodieAvroFileWriter, Closeable { @Override public boolean canWrite() { - return fs.getBytesWritten(file) < maxFileSize; + return !isWrapperFileSystem || wrapperFs.get().getBytesWritten(file) < maxFileSize; } @Override
