steveloughran commented on code in PR #3862: URL: https://github.com/apache/hive/pull/3862#discussion_r1049979713
########## iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hadoop; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.functional.FutureIO; Review Comment: be aware this is 3.3.1+; we had one for internal use in 3.3.0 in o.a.h.fs.impl; it was useful enough we needed to make it public. once you use it, you are hadoop 3.3.1+ *only* ########## iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hadoop; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.functional.FutureIO; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; +import org.apache.iceberg.encryption.NativelyEncryptedFile; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * {@link InputFile} implementation using the Hadoop {@link FileSystem} API. + * + * <p>This class is based on Parquet's HadoopInputFile. + */ +public class HadoopInputFile implements InputFile, NativelyEncryptedFile { + public static final String[] NO_LOCATION_PREFERENCE = new String[0]; + + private final String location; + private final FileSystem fs; + private final Path path; + private final Configuration conf; + private FileStatus stat = null; + private Long length = null; + private NativeFileCryptoParameters nativeDecryptionParameters; + + public static HadoopInputFile fromLocation(CharSequence location, Configuration conf) { + FileSystem fs = Util.getFs(new Path(location.toString()), conf); + return new HadoopInputFile(fs, location.toString(), conf); + } + + public static HadoopInputFile fromLocation( + CharSequence location, long length, Configuration conf) { + FileSystem fs = Util.getFs(new Path(location.toString()), conf); + if (length > 0) { + return new HadoopInputFile(fs, location.toString(), length, conf); + } else { + return new HadoopInputFile(fs, location.toString(), conf); + } + } + + public static HadoopInputFile fromLocation(CharSequence location, FileSystem fs) { + return new HadoopInputFile(fs, location.toString(), fs.getConf()); + } + + public static HadoopInputFile fromLocation(CharSequence location, long length, FileSystem fs) { + return new HadoopInputFile(fs, location.toString(), length, fs.getConf()); + } + + public static HadoopInputFile fromPath(Path path, Configuration conf) { + FileSystem fs = Util.getFs(path, conf); + return fromPath(path, fs, conf); + } + + public static HadoopInputFile fromPath(Path path, long length, Configuration conf) { + FileSystem fs = Util.getFs(path, conf); + return fromPath(path, length, fs, conf); + } + + public static HadoopInputFile fromPath(Path path, FileSystem fs) { + return fromPath(path, fs, fs.getConf()); + } + + public static HadoopInputFile fromPath(Path path, long length, FileSystem fs) { + return fromPath(path, length, fs, fs.getConf()); + } + + public static HadoopInputFile fromPath(Path path, FileSystem fs, Configuration conf) { + return new HadoopInputFile(fs, path, conf); + } + + public static HadoopInputFile fromPath( + Path path, long length, FileSystem fs, Configuration conf) { + return new HadoopInputFile(fs, path, length, conf); + } + + public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf) { + FileSystem fs = Util.getFs(stat.getPath(), conf); + return fromStatus(stat, fs, conf); + } + + public static HadoopInputFile fromStatus(FileStatus stat, FileSystem fs) { + return fromStatus(stat, fs, fs.getConf()); + } + + public static HadoopInputFile fromStatus(FileStatus stat, FileSystem fs, Configuration conf) { + return new HadoopInputFile(fs, stat, conf); + } + + private HadoopInputFile(FileSystem fs, String location, Configuration conf) { + this.fs = fs; + this.location = location; + this.path = new Path(location); + this.conf = conf; + } + + private HadoopInputFile(FileSystem fs, String location, long length, Configuration conf) { + Preconditions.checkArgument(length >= 0, "Invalid file length: %s", length); + this.fs = fs; + this.location = location; + this.path = new Path(location); + this.conf = conf; + this.length = length; + } + + private HadoopInputFile(FileSystem fs, Path path, Configuration conf) { + this.fs = fs; + this.path = path; + this.location = path.toString(); + this.conf = conf; + } + + private HadoopInputFile(FileSystem fs, Path path, long length, Configuration conf) { + Preconditions.checkArgument(length >= 0, "Invalid file length: %s", length); + this.fs = fs; + this.path = path; + this.location = path.toString(); + this.conf = conf; + this.length = length; + } + + private HadoopInputFile(FileSystem fs, FileStatus stat, Configuration conf) { + this.fs = fs; + this.path = stat.getPath(); + this.location = path.toString(); + this.stat = stat; + this.conf = conf; + this.length = stat.getLen(); + } + + private FileStatus lazyStat() { + if (stat == null) { + try { + this.stat = fs.getFileStatus(path); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to get status for file: %s", path); + } + } + return stat; + } + + @Override + public long getLength() { + if (length == null) { + this.length = lazyStat().getLen(); + } + return length; + } + + @Override + public SeekableInputStream newStream() { + try { + return HadoopStreams.wrap( + FutureIO.awaitFuture(fs.openFile(path).opt("fs.s3a.experimental.input.fadvise", "normal").build())); Review Comment: 1. If you have the the length at this point then set it in fs.option.openfile.length ; recent s3a releases skip their HEAD once you tell them how long the file is 2. the openFile future raises all IOEs as runtime IOE's awaitFuture() is a Future.get() with remapping. so you may be able to avoid this double catch and wrap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
