boroknagyz commented on code in PR #8561:
URL: https://github.com/apache/hadoop/pull/8561#discussion_r3458825183
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java:
##########
@@ -378,6 +383,72 @@ public FSDataInputStream open(PathHandle fd, int
bufferSize)
return dfs.createWrappedInputStream(dfsis);
}
+ @Override
+ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
+ final Path path,
+ final OpenFileParameters parameters) throws IOException {
+ AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
+ parameters.getMandatoryKeys(),
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
+ "for " + path);
+ statistics.incrementReadOps(1);
+ storageStatistics.incrementOpCounter(OpType.OPEN);
+ final Path absF = fixRelativePart(path);
+ return LambdaUtils.eval(new CompletableFuture<>(), () -> {
+ LocatedBlocks locatedBlocks =
+ getLocatedBlocksFromStatus(parameters.getStatus());
+ final DFSInputStream dfsis;
+ if (locatedBlocks != null) {
+ dfsis = dfs.open(getPathName(absF), parameters.getBufferSize(),
+ verifyChecksum, locatedBlocks);
+ } else {
+ dfsis = dfs.open(getPathName(absF), parameters.getBufferSize(),
+ verifyChecksum);
+ }
+ return dfs.createWrappedInputStream(dfsis);
+ });
+ }
+
+ private static LocatedBlocks getLocatedBlocksFromStatus(FileStatus status) {
+ if (status instanceof HdfsLocatedFileStatus) {
+ return ((HdfsLocatedFileStatus) status).getLocatedBlocks();
+ }
+ return null;
+ }
+
+ /**
+ * Create a new input stream for the same file as an existing stream,
+ * reusing its cached block locations to avoid a NameNode RPC.
+ * The returned stream is independent (its own position, buffers, etc.)
+ * but shares the same block location metadata.
+ *
+ * @param existing an open input stream obtained from this filesystem
+ * @return a new independent input stream for the same file
+ * @throws IOException if the stream cannot be cloned
+ */
+ public FSDataInputStream cloneDataInputStream(FSDataInputStream existing)
+ throws IOException {
+ statistics.incrementReadOps(1);
+ storageStatistics.incrementOpCounter(OpType.OPEN);
+ InputStream wrapped = existing.getWrappedStream();
+ DFSInputStream dfsis;
+ if (wrapped instanceof DFSInputStream) {
+ dfsis = (DFSInputStream) wrapped;
+ } else if (wrapped instanceof org.apache.hadoop.crypto.CryptoInputStream) {
Review Comment:
Done.
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestOpenFileWithLocatedBlocks.java:
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that opening a file with pre-fetched block locations via the
+ * {@code openFile().withFileStatus()} builder API skips the NameNode RPC
+ * for block locations.
+ */
+public class TestOpenFileWithLocatedBlocks {
+ private static final String NN_METRICS = "NameNodeActivity";
+ private static final int BLOCK_SIZE = 1024;
+ private static final short REPLICATION = 1;
+ private static final String TEST_KEY = "test_key";
+ private static final EnumSet<CreateEncryptionZoneFlag> NO_TRASH =
+ EnumSet.of(CreateEncryptionZoneFlag.NO_TRASH);
+
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem fs;
+ private Configuration conf;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ FileSystemTestHelper fsHelper = new FileSystemTestHelper();
+ File testRootDir = new File(fsHelper.getTestRootDir()).getAbsoluteFile();
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+ JavaKeyStoreProvider.SCHEME_NAME + "://file" +
+ new Path(testRootDir.toString(), "test.jks").toUri());
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1)
+ .build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ private Path createFile(String name, byte[] data) throws IOException {
Review Comment:
Thanks, switched to ContractTestUtils.
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestOpenFileWithLocatedBlocks.java:
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that opening a file with pre-fetched block locations via the
+ * {@code openFile().withFileStatus()} builder API skips the NameNode RPC
+ * for block locations.
+ */
+public class TestOpenFileWithLocatedBlocks {
+ private static final String NN_METRICS = "NameNodeActivity";
+ private static final int BLOCK_SIZE = 1024;
+ private static final short REPLICATION = 1;
+ private static final String TEST_KEY = "test_key";
+ private static final EnumSet<CreateEncryptionZoneFlag> NO_TRASH =
+ EnumSet.of(CreateEncryptionZoneFlag.NO_TRASH);
+
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem fs;
+ private Configuration conf;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ FileSystemTestHelper fsHelper = new FileSystemTestHelper();
+ File testRootDir = new File(fsHelper.getTestRootDir()).getAbsoluteFile();
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+ JavaKeyStoreProvider.SCHEME_NAME + "://file" +
+ new Path(testRootDir.toString(), "test.jks").toUri());
+ cluster = new MiniDFSCluster.Builder(conf)
Review Comment:
Done.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java:
##########
@@ -378,6 +383,72 @@ public FSDataInputStream open(PathHandle fd, int
bufferSize)
return dfs.createWrappedInputStream(dfsis);
}
+ @Override
+ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
+ final Path path,
+ final OpenFileParameters parameters) throws IOException {
+ AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
+ parameters.getMandatoryKeys(),
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
+ "for " + path);
+ statistics.incrementReadOps(1);
+ storageStatistics.incrementOpCounter(OpType.OPEN);
+ final Path absF = fixRelativePart(path);
+ return LambdaUtils.eval(new CompletableFuture<>(), () -> {
+ LocatedBlocks locatedBlocks =
+ getLocatedBlocksFromStatus(parameters.getStatus());
+ final DFSInputStream dfsis;
+ if (locatedBlocks != null) {
+ dfsis = dfs.open(getPathName(absF), parameters.getBufferSize(),
+ verifyChecksum, locatedBlocks);
+ } else {
+ dfsis = dfs.open(getPathName(absF), parameters.getBufferSize(),
+ verifyChecksum);
+ }
+ return dfs.createWrappedInputStream(dfsis);
+ });
+ }
+
+ private static LocatedBlocks getLocatedBlocksFromStatus(FileStatus status) {
+ if (status instanceof HdfsLocatedFileStatus) {
+ return ((HdfsLocatedFileStatus) status).getLocatedBlocks();
+ }
+ return null;
+ }
+
+ /**
+ * Create a new input stream for the same file as an existing stream,
+ * reusing its cached block locations to avoid a NameNode RPC.
+ * The returned stream is independent (its own position, buffers, etc.)
+ * but shares the same block location metadata.
+ *
+ * @param existing an open input stream obtained from this filesystem
+ * @return a new independent input stream for the same file
+ * @throws IOException if the stream cannot be cloned
+ */
+ public FSDataInputStream cloneDataInputStream(FSDataInputStream existing)
+ throws IOException {
+ statistics.incrementReadOps(1);
+ storageStatistics.incrementOpCounter(OpType.OPEN);
+ InputStream wrapped = existing.getWrappedStream();
+ DFSInputStream dfsis;
+ if (wrapped instanceof DFSInputStream) {
+ dfsis = (DFSInputStream) wrapped;
+ } else if (wrapped instanceof org.apache.hadoop.crypto.CryptoInputStream) {
+ dfsis = (DFSInputStream)
+ ((org.apache.hadoop.crypto.CryptoInputStream) wrapped)
+ .getWrappedStream();
+ } else {
+ throw new IOException("Cannot clone: underlying stream is "
+ + wrapped.getClass().getName() + ", not a DFSInputStream");
+ }
+ LocatedBlocks locatedBlocks = dfsis.getLocatedBlocks();
+ String src = dfsis.getSrc();
+ DFSInputStream clone = dfs.open(src,
+ dfs.getConf().getIoBufferSize(), verifyChecksum, locatedBlocks);
+ return dfs.createWrappedInputStream(clone);
Review Comment:
Done.
##########
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h:
##########
@@ -685,11 +685,27 @@ extern "C" {
int hdfsCloseFile(hdfsFS fs, hdfsFile file);
- /**
- * hdfsExists - Checks if a given path exsits on the filesystem
+ /**
+ * hdfsCloneFile - Create a new independent file handle for the same file
+ * as an existing handle, reusing cached block locations to avoid a
+ * NameNode RPC. The cloned handle has its own read position and buffers.
+ * Only works for input (read) streams on a DistributedFileSystem.
+ *
+ * @param fs The configured filesystem handle.
+ * @param file An open input file handle to clone.
+ * @return Returns a new file handle on success, NULL on error.
+ * On error, errno will be set appropriately.
+ * The returned handle must be closed with hdfsCloseFile.
+ */
+ LIBHDFS_EXTERNAL
+ hdfsFile hdfsCloneFile(hdfsFS fs, hdfsFile file);
+
+
+ /**
+ * hdfsExists - Checks if a given path exsits on the filesystem
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]