This is an automated email from the ASF dual-hosted git repository. shahrs87 pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new a665fe145841 HDFS-17299. Adding rack failure tolerance when creating a new file (#6614) a665fe145841 is described below commit a665fe145841f1ad16f47981e65d3f244799ff99 Author: ritegarg <58840065+riteg...@users.noreply.github.com> AuthorDate: Sun Mar 17 21:23:04 2024 -0700 HDFS-17299. Adding rack failure tolerance when creating a new file (#6614) --- .../java/org/apache/hadoop/hdfs/DataStreamer.java | 56 +++++--- .../hadoop/hdfs/server/datanode/BlockReceiver.java | 5 +- .../server/datanode/fsdataset/FsDatasetSpi.java | 12 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 33 ++++- .../hadoop/hdfs/TestDistributedFileSystem.java | 152 ++++++++++++++++++++- .../hdfs/server/datanode/SimulatedFSDataset.java | 6 + .../datanode/extdataset/ExternalDatasetImpl.java | 6 + 7 files changed, 245 insertions(+), 25 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index fb7568dbfe54..a02444fa596a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -94,6 +94,7 @@ import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -706,8 +707,8 @@ class DataStreamer extends Daemon { // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { - LOG.debug("Allocating new block"); - setPipeline(nextBlockOutputStream()); + LOG.debug("Allocating new block: {}", this); + setupPipelineForCreate(); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { LOG.debug("Append to block {}", block); @@ -1241,7 +1242,8 @@ class DataStreamer extends Daemon { streamerClosed = true; return false; } - boolean doSleep = setupPipelineForAppendOrRecovery(); + + setupPipelineForAppendOrRecovery(); if (!streamerClosed && dfsClient.clientRunning) { if (stage == BlockConstructionStage.PIPELINE_CLOSE) { @@ -1275,7 +1277,7 @@ class DataStreamer extends Daemon { } } - return doSleep; + return false; } void setHflush() { @@ -1449,9 +1451,11 @@ class DataStreamer extends Daemon { * it can be written to. * This happens when a file is appended or data streaming fails * It keeps on trying until a pipeline is setup + * + * Returns boolean whether pipeline was setup successfully or not. + * This boolean is used upstream on whether to continue creating pipeline or throw exception */ private boolean setupPipelineForAppendOrRecovery() throws IOException { - // check number of datanodes if (nodes == null || nodes.length == 0) { String msg = "Could not get block locations. " + "Source file \"" + src + "\" - Aborting..."; @@ -1463,23 +1467,35 @@ class DataStreamer extends Daemon { boolean success = false; long newGS = 0L; + boolean isCreateStage = BlockConstructionStage.PIPELINE_SETUP_CREATE == stage; while (!success && !streamerClosed && dfsClient.clientRunning) { if (!handleRestartingDatanode()) { return false; } - final boolean isRecovery = errorState.hasError(); + final boolean isRecovery = errorState.hasError() && !isCreateStage; + if (!handleBadDatanode()) { return false; } handleDatanodeReplacement(); + // During create stage, min replication should still be satisfied. + if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && + nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) { + return false; + } + // get a new generation stamp and an access token final LocatedBlock lb = updateBlockForPipeline(); newGS = lb.getBlock().getGenerationStamp(); accessToken = lb.getBlockToken(); + if (isCreateStage) { + block.setCurrentBlock(lb.getBlock()); + } + // set up the pipeline again with the remaining nodes success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); @@ -1491,7 +1507,7 @@ class DataStreamer extends Daemon { if (success) { updatePipeline(newGS); } - return false; // do not sleep, continue processing + return success; } /** @@ -1629,10 +1645,10 @@ class DataStreamer extends Daemon { * Must get block ID and the IDs of the destinations from the namenode. * Returns the list of target datanodes. */ - protected LocatedBlock nextBlockOutputStream() throws IOException { + protected void setupPipelineForCreate() throws IOException { LocatedBlock lb; DatanodeInfo[] nodes; - StorageType[] storageTypes; + StorageType[] nextStorageTypes; int count = dfsClient.getConf().getNumBlockWriteRetry(); boolean success; final ExtendedBlock oldBlock = block.getCurrentBlock(); @@ -1640,6 +1656,7 @@ class DataStreamer extends Daemon { errorState.reset(); lastException.clear(); success = false; + streamerClosed = false; DatanodeInfo[] excluded = getExcludedNodes(); lb = locateFollowingBlock( @@ -1649,26 +1666,33 @@ class DataStreamer extends Daemon { bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); - storageTypes = lb.getStorageTypes(); - - // Connect to first DataNode in the list. - success = createBlockOutputStream(nodes, storageTypes, 0L, false); - + nextStorageTypes = lb.getStorageTypes(); + setPipeline(lb); + try { + // Connect to first DataNode in the list. + success = createBlockOutputStream(nodes, nextStorageTypes, 0L, false) + || setupPipelineForAppendOrRecovery(); + } catch(IOException ie) { + LOG.warn("Exception in setupPipelineForCreate " + this, ie); + success = false; + } if (!success) { LOG.warn("Abandoning " + block); dfsClient.namenode.abandonBlock(block.getCurrentBlock(), stat.getFileId(), src, dfsClient.clientName); block.setCurrentBlock(null); - final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; + final DatanodeInfo badNode = errorState.getBadNodeIndex() == -1 + ? Iterables.getLast(failed) + : nodes[errorState.getBadNodeIndex()]; LOG.warn("Excluding datanode " + badNode); excludedNodes.put(badNode, badNode); + setPipeline(null, null, null); } } while (!success && --count >= 0); if (!success) { throw new IOException("Unable to create new block."); } - return lb; } // connects to the first datanode in the pipeline diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 7f381b1775ee..9f98aea193c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -212,7 +212,10 @@ class BlockReceiver implements Closeable { } else { switch (stage) { case PIPELINE_SETUP_CREATE: - replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist); + replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist, newGs); + if (newGs != 0L) { + block.setGenerationStamp(newGs); + } datanode.notifyNamenodeReceivingBlock( block, replicaHandler.getReplica().getStorageUuid()); break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index d29d7722618a..d23405bd726c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -333,6 +333,16 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { ReplicaHandler createRbw(StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException; + /** + * Creates a RBW replica and returns the meta info of the replica + * + * @param b block + * @return the meta info of the replica which is being written to + * @throws IOException if an error occurs + */ + ReplicaHandler createRbw(StorageType storageType, + ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException; + /** * Recovers a RBW replica and returns the meta info of the replica. * @@ -466,7 +476,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { boolean isValidRbw(ExtendedBlock b); /** - * Invalidates the specified blocks + * Invalidates the specified blocks. * @param bpid Block pool Id * @param invalidBlks - the blocks to be invalidated * @throws IOException diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 81b0d67c3823..1764c38827d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1472,13 +1472,29 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { public ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { + return createRbw(storageType, b, allowLazyPersist, 0L); + } + + @Override // FsDatasetSpi + public ReplicaHandler createRbw( + StorageType storageType, ExtendedBlock b, + boolean allowLazyPersist, long newGS) throws IOException { try(AutoCloseableLock lock = datasetWriteLock.acquire()) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { - throw new ReplicaAlreadyExistsException("Block " + b + - " already exists in state " + replicaInfo.getState() + - " and thus cannot be created."); + // In case of retries with same blockPoolId + blockId as before + // with updated GS, cleanup the old replica to avoid + // any multiple copies with same blockPoolId + blockId + if (newGS != 0L) { + cleanupReplica(replicaInfo, replicaInfo.getBlockFile(), replicaInfo.getMetaFile(), + replicaInfo.getBlockFile().length(), replicaInfo.getMetaFile().length(), + b.getBlockPoolId()); + } else { + throw new ReplicaAlreadyExistsException("Block " + b + + " already exists in state " + replicaInfo.getState() + + " and thus cannot be created."); + } } // create a new block FsVolumeReference ref = null; @@ -3198,6 +3214,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { newReplicaInfo.isOnTransientStorage()); // Remove the old replicas + cleanupReplica(replicaInfo, blockFile, metaFile, blockFileUsed, metaFileUsed, bpid); + + // If deletion failed then the directory scanner will cleanup the blocks + // eventually. + } + + private void cleanupReplica(ReplicaInfo replicaInfo, File blockFile, File metaFile, + long blockFileUsed, long metaFileUsed, final String bpid) { if (blockFile.delete() || !blockFile.exists()) { FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume(); volume.onBlockFileDeletion(bpid, blockFileUsed); @@ -3205,9 +3229,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { volume.onMetaFileDeletion(bpid, metaFileUsed); } } - - // If deletion failed then the directory scanner will cleanup the blocks - // eventually. } class LazyWriter implements Runnable { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index e98733c01b34..633d3daa1171 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -54,7 +54,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStorageLocation; @@ -89,6 +88,8 @@ import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -1707,4 +1708,153 @@ public class TestDistributedFileSystem { assertEquals("Number of SSD should be 1 but was : " + numSSD, 1, numSSD); } } + + @Test + public void testSingleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 2); + // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6) + .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill all the DNs in the 3rd rack, so only 2 racks stays with 1 active DN each + cluster.stopDataNode(5); + cluster.stopDataNode(4); + cluster.stopDataNode(3); + cluster.stopDataNode(2); + + // create a file with replication 3, for rack fault tolerant BPP, + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } + } + + @Test + public void testSingleRackFailureDuringPipelineSetupMinReplicationImpossible() + throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, BlockPlacementPolicy.class); + conf.setBoolean(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, 3); + // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6) + .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill all the DNs in the 3rd rack, so only 2 racks stays with 1 active DN each + cluster.stopDataNode(5); + cluster.stopDataNode(4); + cluster.stopDataNode(3); + cluster.stopDataNode(2); + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } catch (IOException e) { + threw = true; + } + assertTrue(threw); + } + } + + @Test + public void testMultipleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 1); + // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6) + .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill all DNs except 1, so only rack1 stays with 1 active DN + cluster.stopDataNode(5); + cluster.stopDataNode(4); + cluster.stopDataNode(3); + cluster.stopDataNode(2); + cluster.stopDataNode(1); + // create a file with replication 3, for rack fault tolerant BPP, + // it should allocate nodes in all 3 racks. + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } + } + + @Test + public void testMultipleRackFailureDuringPipelineSetupMinReplicationImpossible() + throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 2); + // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6) + .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill all DNs except 1, so only rack1 stays with 1 active DN + cluster.stopDataNode(5); + cluster.stopDataNode(4); + cluster.stopDataNode(3); + cluster.stopDataNode(2); + cluster.stopDataNode(1); + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), + 1024L, (short) 3, 1024L); + } catch (IOException e) { + threw = true; + } + assertTrue(threw); + } + } + + @Test + public void testAllRackFailureDuringPipelineSetup() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6) + .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // shutdown all DNs + cluster.shutdownDataNodes(); + // create a file with replication 3, for rack fault tolerant BPP, + // it should allocate nodes in all 3 rack but fail because no DNs are present. + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), + 1024L, (short) 3, 1024L); + } catch (IOException e) { + threw = true; + } + assertTrue(threw); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index b4526fb45e78..4a054c51db91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1103,6 +1103,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { return createTemporary(storageType, b, false); } + @Override + public ReplicaHandler createRbw(StorageType storageType, + ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException { + return createRbw(storageType, b, allowLazyPersist); + } + @Override // FsDatasetSpi public synchronized ReplicaHandler createTemporary(StorageType storageType, ExtendedBlock b, boolean isTransfer) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 1f03f5084bce..2486d1db543d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -148,6 +148,12 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> { return new ReplicaHandler(new ExternalReplicaInPipeline(), null); } + @Override + public ReplicaHandler createRbw(StorageType storageType, + ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException { + return createRbw(storageType, b, allowLazyPersist); + } + @Override public ReplicaHandler recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org