[
https://issues.apache.org/jira/browse/HIVE-26699?focusedWorklogId=833926&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-833926
]
ASF GitHub Bot logged work on HIVE-26699:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Dec/22 18:03
Start Date: 15/Dec/22 18:03
Worklog Time Spent: 10m
Work Description: steveloughran commented on code in PR #3862:
URL: https://github.com/apache/hive/pull/3862#discussion_r1049979713
##########
iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.functional.FutureIO;
Review Comment:
be aware this is 3.3.1+; we had one for internal use in 3.3.0 in
o.a.h.fs.impl; it was useful enough we needed to make it public. once you use
it, you are hadoop 3.3.1+ *only*
##########
iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.functional.FutureIO;
+import org.apache.iceberg.encryption.NativeFileCryptoParameters;
+import org.apache.iceberg.encryption.NativelyEncryptedFile;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * {@link InputFile} implementation using the Hadoop {@link FileSystem} API.
+ *
+ * <p>This class is based on Parquet's HadoopInputFile.
+ */
+public class HadoopInputFile implements InputFile, NativelyEncryptedFile {
+ public static final String[] NO_LOCATION_PREFERENCE = new String[0];
+
+ private final String location;
+ private final FileSystem fs;
+ private final Path path;
+ private final Configuration conf;
+ private FileStatus stat = null;
+ private Long length = null;
+ private NativeFileCryptoParameters nativeDecryptionParameters;
+
+ public static HadoopInputFile fromLocation(CharSequence location,
Configuration conf) {
+ FileSystem fs = Util.getFs(new Path(location.toString()), conf);
+ return new HadoopInputFile(fs, location.toString(), conf);
+ }
+
+ public static HadoopInputFile fromLocation(
+ CharSequence location, long length, Configuration conf) {
+ FileSystem fs = Util.getFs(new Path(location.toString()), conf);
+ if (length > 0) {
+ return new HadoopInputFile(fs, location.toString(), length, conf);
+ } else {
+ return new HadoopInputFile(fs, location.toString(), conf);
+ }
+ }
+
+ public static HadoopInputFile fromLocation(CharSequence location, FileSystem
fs) {
+ return new HadoopInputFile(fs, location.toString(), fs.getConf());
+ }
+
+ public static HadoopInputFile fromLocation(CharSequence location, long
length, FileSystem fs) {
+ return new HadoopInputFile(fs, location.toString(), length, fs.getConf());
+ }
+
+ public static HadoopInputFile fromPath(Path path, Configuration conf) {
+ FileSystem fs = Util.getFs(path, conf);
+ return fromPath(path, fs, conf);
+ }
+
+ public static HadoopInputFile fromPath(Path path, long length, Configuration
conf) {
+ FileSystem fs = Util.getFs(path, conf);
+ return fromPath(path, length, fs, conf);
+ }
+
+ public static HadoopInputFile fromPath(Path path, FileSystem fs) {
+ return fromPath(path, fs, fs.getConf());
+ }
+
+ public static HadoopInputFile fromPath(Path path, long length, FileSystem
fs) {
+ return fromPath(path, length, fs, fs.getConf());
+ }
+
+ public static HadoopInputFile fromPath(Path path, FileSystem fs,
Configuration conf) {
+ return new HadoopInputFile(fs, path, conf);
+ }
+
+ public static HadoopInputFile fromPath(
+ Path path, long length, FileSystem fs, Configuration conf) {
+ return new HadoopInputFile(fs, path, length, conf);
+ }
+
+ public static HadoopInputFile fromStatus(FileStatus stat, Configuration
conf) {
+ FileSystem fs = Util.getFs(stat.getPath(), conf);
+ return fromStatus(stat, fs, conf);
+ }
+
+ public static HadoopInputFile fromStatus(FileStatus stat, FileSystem fs) {
+ return fromStatus(stat, fs, fs.getConf());
+ }
+
+ public static HadoopInputFile fromStatus(FileStatus stat, FileSystem fs,
Configuration conf) {
+ return new HadoopInputFile(fs, stat, conf);
+ }
+
+ private HadoopInputFile(FileSystem fs, String location, Configuration conf) {
+ this.fs = fs;
+ this.location = location;
+ this.path = new Path(location);
+ this.conf = conf;
+ }
+
+ private HadoopInputFile(FileSystem fs, String location, long length,
Configuration conf) {
+ Preconditions.checkArgument(length >= 0, "Invalid file length: %s",
length);
+ this.fs = fs;
+ this.location = location;
+ this.path = new Path(location);
+ this.conf = conf;
+ this.length = length;
+ }
+
+ private HadoopInputFile(FileSystem fs, Path path, Configuration conf) {
+ this.fs = fs;
+ this.path = path;
+ this.location = path.toString();
+ this.conf = conf;
+ }
+
+ private HadoopInputFile(FileSystem fs, Path path, long length, Configuration
conf) {
+ Preconditions.checkArgument(length >= 0, "Invalid file length: %s",
length);
+ this.fs = fs;
+ this.path = path;
+ this.location = path.toString();
+ this.conf = conf;
+ this.length = length;
+ }
+
+ private HadoopInputFile(FileSystem fs, FileStatus stat, Configuration conf) {
+ this.fs = fs;
+ this.path = stat.getPath();
+ this.location = path.toString();
+ this.stat = stat;
+ this.conf = conf;
+ this.length = stat.getLen();
+ }
+
+ private FileStatus lazyStat() {
+ if (stat == null) {
+ try {
+ this.stat = fs.getFileStatus(path);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to get status for file: %s",
path);
+ }
+ }
+ return stat;
+ }
+
+ @Override
+ public long getLength() {
+ if (length == null) {
+ this.length = lazyStat().getLen();
+ }
+ return length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ try {
+ return HadoopStreams.wrap(
+
FutureIO.awaitFuture(fs.openFile(path).opt("fs.s3a.experimental.input.fadvise",
"normal").build()));
Review Comment:
1. If you have the the length at this point then set it in
fs.option.openfile.length ; recent s3a releases skip their HEAD once you tell
them how long the file is
2. the openFile future raises all IOEs as runtime IOE's
awaitFuture() is a Future.get() with remapping.
so you may be able to avoid this double catch and wrap.
Issue Time Tracking
-------------------
Worklog Id: (was: 833926)
Time Spent: 1h 20m (was: 1h 10m)
> Iceberg: S3 fadvise can hurt JSON parsing significantly in DWX
> --------------------------------------------------------------
>
> Key: HIVE-26699
> URL: https://issues.apache.org/jira/browse/HIVE-26699
> Project: Hive
> Issue Type: Improvement
> Reporter: Rajesh Balamohan
> Priority: Major
> Labels: pull-request-available
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> Hive reads JSON metadata information (TableMetadataParser::read()) multiple
> times; E.g during query compilation, AM split computation, stats computation,
> during commits etc.
>
> With large JSON files (due to multiple inserts), it takes a lot longer time
> with S3 FS with "fs.s3a.experimental.input.fadvise" set to "random". (e.g in
> the order of 10x).To be on safer side, it will be good to set this to
> "normal" mode in configs, when reading iceberg tables.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)