This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 064235decce [HUDI-7796] Gracefully cast file system instance in Avro
writers (#11304)
064235decce is described below
commit 064235decced2fb9d04c5e2e85a6472469896dab
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat May 25 20:21:19 2024 -0700
[HUDI-7796] Gracefully cast file system instance in Avro writers (#11304)
---
.../org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java | 12 ++++++++----
.../java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java | 11 ++++++++---
2 files changed, 16 insertions(+), 7 deletions(-)
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
index d3d66b5c978..c23cb438310 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
@@ -35,6 +35,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@@ -68,7 +69,8 @@ public class HoodieAvroHFileWriter
private static AtomicLong recordIndex = new AtomicLong(1);
private final Path file;
private HoodieHFileConfig hfileConfig;
- private final HoodieWrapperFileSystem fs;
+ private final boolean isWrapperFileSystem;
+ private final Option<HoodieWrapperFileSystem> wrapperFs;
private final long maxFileSize;
private final String instantTime;
private final TaskContextSupplier taskContextSupplier;
@@ -88,7 +90,9 @@ public class HoodieAvroHFileWriter
Configuration conf = HadoopFSUtils.registerFileSystem(file,
hfileConfig.getHadoopConf());
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
- this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
+ FileSystem fs = this.file.getFileSystem(conf);
+ this.isWrapperFileSystem = fs instanceof HoodieWrapperFileSystem;
+ this.wrapperFs = this.isWrapperFileSystem ?
Option.of((HoodieWrapperFileSystem) fs) : Option.empty();
this.hfileConfig = hfileConfig;
this.schema = schema;
this.keyFieldSchema =
Option.ofNullable(schema.getField(hfileConfig.getKeyFieldName()));
@@ -114,7 +118,7 @@ public class HoodieAvroHFileWriter
String.valueOf(hfileConfig.shouldDropBehindCacheCompaction()));
CacheConfig cacheConfig = new CacheConfig(conf);
this.writer = HFile.getWriterFactory(conf, cacheConfig)
- .withPath(this.fs, this.file)
+ .withPath(fs, this.file)
.withFileContext(context)
.create();
@@ -136,7 +140,7 @@ public class HoodieAvroHFileWriter
@Override
public boolean canWrite() {
- return fs.getBytesWritten(file) < maxFileSize;
+ return !isWrapperFileSystem || wrapperFs.get().getBytesWritten(file) <
maxFileSize;
}
@Override
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
index 3ecc8fcd450..0516caad9ee 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
@@ -25,6 +25,7 @@ import
org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.util.AvroOrcUtils;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
import org.apache.hudi.io.storage.HoodieAvroFileWriter;
@@ -35,6 +36,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -61,7 +63,8 @@ public class HoodieAvroOrcWriter implements
HoodieAvroFileWriter, Closeable {
private final Writer writer;
private final Path file;
- private final HoodieWrapperFileSystem fs;
+ private final boolean isWrapperFileSystem;
+ private final Option<HoodieWrapperFileSystem> wrapperFs;
private final String instantTime;
private final TaskContextSupplier taskContextSupplier;
@@ -74,7 +77,9 @@ public class HoodieAvroOrcWriter implements
HoodieAvroFileWriter, Closeable {
Configuration conf = HadoopFSUtils.registerFileSystem(file,
config.getStorageConf().unwrapAs(Configuration.class));
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
- this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
+ FileSystem fs = this.file.getFileSystem(conf);
+ this.isWrapperFileSystem = fs instanceof HoodieWrapperFileSystem;
+ this.wrapperFs = this.isWrapperFileSystem ?
Option.of((HoodieWrapperFileSystem) fs) : Option.empty();
this.instantTime = instantTime;
this.taskContextSupplier = taskContextSupplier;
@@ -104,7 +109,7 @@ public class HoodieAvroOrcWriter implements
HoodieAvroFileWriter, Closeable {
@Override
public boolean canWrite() {
- return fs.getBytesWritten(file) < maxFileSize;
+ return !isWrapperFileSystem || wrapperFs.get().getBytesWritten(file) <
maxFileSize;
}
@Override