steveloughran commented on code in PR #8561:
URL: https://github.com/apache/hadoop/pull/8561#discussion_r3454963767
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java:
##########
@@ -378,6 +383,72 @@ public FSDataInputStream open(PathHandle fd, int
bufferSize)
return dfs.createWrappedInputStream(dfsis);
}
+ @Override
+ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
+ final Path path,
+ final OpenFileParameters parameters) throws IOException {
+ AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
+ parameters.getMandatoryKeys(),
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
+ "for " + path);
+ statistics.incrementReadOps(1);
+ storageStatistics.incrementOpCounter(OpType.OPEN);
+ final Path absF = fixRelativePart(path);
+ return LambdaUtils.eval(new CompletableFuture<>(), () -> {
+ LocatedBlocks locatedBlocks =
+ getLocatedBlocksFromStatus(parameters.getStatus());
+ final DFSInputStream dfsis;
+ if (locatedBlocks != null) {
+ dfsis = dfs.open(getPathName(absF), parameters.getBufferSize(),
+ verifyChecksum, locatedBlocks);
+ } else {
+ dfsis = dfs.open(getPathName(absF), parameters.getBufferSize(),
+ verifyChecksum);
+ }
+ return dfs.createWrappedInputStream(dfsis);
+ });
+ }
+
+ private static LocatedBlocks getLocatedBlocksFromStatus(FileStatus status) {
+ if (status instanceof HdfsLocatedFileStatus) {
+ return ((HdfsLocatedFileStatus) status).getLocatedBlocks();
+ }
+ return null;
+ }
+
+ /**
+ * Create a new input stream for the same file as an existing stream,
+ * reusing its cached block locations to avoid a NameNode RPC.
+ * The returned stream is independent (its own position, buffers, etc.)
+ * but shares the same block location metadata.
+ *
+ * @param existing an open input stream obtained from this filesystem
+ * @return a new independent input stream for the same file
+ * @throws IOException if the stream cannot be cloned
+ */
+ public FSDataInputStream cloneDataInputStream(FSDataInputStream existing)
+ throws IOException {
+ statistics.incrementReadOps(1);
+ storageStatistics.incrementOpCounter(OpType.OPEN);
+ InputStream wrapped = existing.getWrappedStream();
+ DFSInputStream dfsis;
+ if (wrapped instanceof DFSInputStream) {
+ dfsis = (DFSInputStream) wrapped;
+ } else if (wrapped instanceof org.apache.hadoop.crypto.CryptoInputStream) {
Review Comment:
you could use the java17 instanceof + variable declaration here as it's
worth the effort; intellij does the work for you
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestOpenFileWithLocatedBlocks.java:
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that opening a file with pre-fetched block locations via the
+ * {@code openFile().withFileStatus()} builder API skips the NameNode RPC
+ * for block locations.
+ */
+public class TestOpenFileWithLocatedBlocks {
+ private static final String NN_METRICS = "NameNodeActivity";
+ private static final int BLOCK_SIZE = 1024;
+ private static final short REPLICATION = 1;
+ private static final String TEST_KEY = "test_key";
+ private static final EnumSet<CreateEncryptionZoneFlag> NO_TRASH =
+ EnumSet.of(CreateEncryptionZoneFlag.NO_TRASH);
+
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem fs;
+ private Configuration conf;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ FileSystemTestHelper fsHelper = new FileSystemTestHelper();
+ File testRootDir = new File(fsHelper.getTestRootDir()).getAbsoluteFile();
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+ JavaKeyStoreProvider.SCHEME_NAME + "://file" +
+ new Path(testRootDir.toString(), "test.jks").toUri());
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1)
+ .build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ private Path createFile(String name, byte[] data) throws IOException {
Review Comment:
org.apache.hadoop.fs.contract.ContractTestUtils do this and the readfully
for you
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestOpenFileWithLocatedBlocks.java:
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that opening a file with pre-fetched block locations via the
+ * {@code openFile().withFileStatus()} builder API skips the NameNode RPC
+ * for block locations.
+ */
+public class TestOpenFileWithLocatedBlocks {
+ private static final String NN_METRICS = "NameNodeActivity";
+ private static final int BLOCK_SIZE = 1024;
+ private static final short REPLICATION = 1;
+ private static final String TEST_KEY = "test_key";
+ private static final EnumSet<CreateEncryptionZoneFlag> NO_TRASH =
+ EnumSet.of(CreateEncryptionZoneFlag.NO_TRASH);
+
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem fs;
+ private Configuration conf;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ FileSystemTestHelper fsHelper = new FileSystemTestHelper();
+ File testRootDir = new File(fsHelper.getTestRootDir()).getAbsoluteFile();
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+ JavaKeyStoreProvider.SCHEME_NAME + "://file" +
+ new Path(testRootDir.toString(), "test.jks").toUri());
+ cluster = new MiniDFSCluster.Builder(conf)
Review Comment:
hdfs tests must do it per test suite, so tests run faster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]