Copilot commented on code in PR #8561: URL: https://github.com/apache/hadoop/pull/8561#discussion_r3455045489
########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestOpenFileWithLocatedBlocks.java: ########## @@ -0,0 +1,475 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + Review Comment: Remove unused static imports; they cause compilation to fail (unused imports are errors in Java). ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java: ########## @@ -378,6 +383,72 @@ public FSDataInputStream open(PathHandle fd, int bufferSize) return dfs.createWrappedInputStream(dfsis); } + @Override + protected CompletableFuture<FSDataInputStream> openFileWithOptions( + final Path path, + final OpenFileParameters parameters) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), + Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, + "for " + path); + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.OPEN); + final Path absF = fixRelativePart(path); + return LambdaUtils.eval(new CompletableFuture<>(), () -> { + LocatedBlocks locatedBlocks = + getLocatedBlocksFromStatus(parameters.getStatus()); + final DFSInputStream dfsis; + if (locatedBlocks != null) { + dfsis = dfs.open(getPathName(absF), parameters.getBufferSize(), + verifyChecksum, locatedBlocks); + } else { + dfsis = dfs.open(getPathName(absF), parameters.getBufferSize(), + verifyChecksum); + } + return dfs.createWrappedInputStream(dfsis); + }); Review Comment: openFileWithOptions() currently bypasses DistributedFileSystem#open(Path,int), so symlink resolution via FileSystemLinkResolver is skipped. Also, if dfs.createWrappedInputStream(dfsis) throws, dfsis is leaked because it isn't closed in this method (unlike open(Path,int) which closes on wrap failure). ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java: ########## @@ -378,6 +383,72 @@ public FSDataInputStream open(PathHandle fd, int bufferSize) return dfs.createWrappedInputStream(dfsis); } + @Override + protected CompletableFuture<FSDataInputStream> openFileWithOptions( + final Path path, + final OpenFileParameters parameters) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), + Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, + "for " + path); + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.OPEN); + final Path absF = fixRelativePart(path); + return LambdaUtils.eval(new CompletableFuture<>(), () -> { + LocatedBlocks locatedBlocks = + getLocatedBlocksFromStatus(parameters.getStatus()); + final DFSInputStream dfsis; + if (locatedBlocks != null) { + dfsis = dfs.open(getPathName(absF), parameters.getBufferSize(), + verifyChecksum, locatedBlocks); + } else { + dfsis = dfs.open(getPathName(absF), parameters.getBufferSize(), + verifyChecksum); + } + return dfs.createWrappedInputStream(dfsis); + }); + } + + private static LocatedBlocks getLocatedBlocksFromStatus(FileStatus status) { + if (status instanceof HdfsLocatedFileStatus) { + return ((HdfsLocatedFileStatus) status).getLocatedBlocks(); + } + return null; + } + + /** + * Create a new input stream for the same file as an existing stream, + * reusing its cached block locations to avoid a NameNode RPC. + * The returned stream is independent (its own position, buffers, etc.) + * but shares the same block location metadata. + * + * @param existing an open input stream obtained from this filesystem + * @return a new independent input stream for the same file + * @throws IOException if the stream cannot be cloned + */ + public FSDataInputStream cloneDataInputStream(FSDataInputStream existing) + throws IOException { + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.OPEN); + InputStream wrapped = existing.getWrappedStream(); + DFSInputStream dfsis; + if (wrapped instanceof DFSInputStream) { + dfsis = (DFSInputStream) wrapped; + } else if (wrapped instanceof org.apache.hadoop.crypto.CryptoInputStream) { + dfsis = (DFSInputStream) + ((org.apache.hadoop.crypto.CryptoInputStream) wrapped) + .getWrappedStream(); + } else { + throw new IOException("Cannot clone: underlying stream is " + + wrapped.getClass().getName() + ", not a DFSInputStream"); + } + LocatedBlocks locatedBlocks = dfsis.getLocatedBlocks(); + String src = dfsis.getSrc(); + DFSInputStream clone = dfs.open(src, + dfs.getConf().getIoBufferSize(), verifyChecksum, locatedBlocks); + return dfs.createWrappedInputStream(clone); Review Comment: cloneDataInputStream() should (1) verify the stream was opened by this DistributedFileSystem/DFSClient (to avoid cloning a stream from a different cluster instance) and (2) close the newly opened DFSInputStream if wrapping it (e.g., CryptoInputStream) fails, matching the error-handling pattern in open(Path,int). ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java: ########## @@ -1078,6 +1078,24 @@ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) } } + /** + * Create an input stream using pre-fetched block locations, skipping the + * NameNode RPC to get block locations. + * @param src file name + * @param buffersize ignored + * @param verifyChecksum verify checksums before returning data to client + * @param locatedBlocks pre-fetched block locations for this file + * @return an input stream for reading the file + * @throws IOException on I/O error + */ + public DFSInputStream open(String src, int buffersize, boolean verifyChecksum, + LocatedBlocks locatedBlocks) throws IOException { + checkOpen(); + try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) { + return openInternal(locatedBlocks, src, verifyChecksum); + } + } Review Comment: The new DFSClient#open(String,int,boolean,LocatedBlocks) overload will currently throw a generic IOException("Cannot open filename ...") if locatedBlocks is null (via openInternal). This is a confusing failure mode for a caller; explicitly validate the parameter and fail fast with IllegalArgumentException. ########## hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h: ########## @@ -685,11 +685,27 @@ extern "C" { int hdfsCloseFile(hdfsFS fs, hdfsFile file); - /** - * hdfsExists - Checks if a given path exsits on the filesystem + /** + * hdfsCloneFile - Create a new independent file handle for the same file + * as an existing handle, reusing cached block locations to avoid a + * NameNode RPC. The cloned handle has its own read position and buffers. + * Only works for input (read) streams on a DistributedFileSystem. + * + * @param fs The configured filesystem handle. + * @param file An open input file handle to clone. + * @return Returns a new file handle on success, NULL on error. + * On error, errno will be set appropriately. + * The returned handle must be closed with hdfsCloseFile. + */ + LIBHDFS_EXTERNAL + hdfsFile hdfsCloneFile(hdfsFS fs, hdfsFile file); + + + /** + * hdfsExists - Checks if a given path exsits on the filesystem Review Comment: Typo in comment: "exsits" → "exists". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
