[ https://issues.apache.org/jira/browse/HDFS-17658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895763#comment-17895763 ]
Kevin Wikant edited comment on HDFS-17658 at 11/5/24 7:03 PM: -------------------------------------------------------------- h2. Test Methodology * Create a new HDFS cluster (with 0 blocks to begin with) * Run the test script to execute the desired test case * Check the namenode logs to identify which datanode(s) have the block replica(s) * Start decommissioning all the datanode(s) with block replica(s) * Check the namenode logs to check if the block replica(s) are considered by the DatanodeAdminManager & are replicated to other datanode(s) before the containing datanode(s) become decommissioned: ** For completed/finalized blocks, these are considered by DatanodeAdminManager & are replicated to other datanodes. ** For open blocks, these are not considered by DatanodeAdminManager & are not replicated to other datanodes. * Terminate the decommissioned datanode(s). Then validate if there was any HDFS data loss ** For completed/finalized blocks, they were replicated to other live datanodes & are therefore still accessible. ** For open blocks, they were not replicated to other live datanodes & were therefore lost when the decommissioned datanodes were terminated. Its worth noting here that the lost data includes: * any data the open DFSOutputStream has already appended. * any data from before the DFSOutputStream opened the existing block (i.e. data that was previously committed by another DFSOutputStream that has been closed already) Also, there are several additional DEBUG logs I have included for each test which help demonstrate what is the root cause of the issue. h2. Test Script {code:java} import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.fs.CreateFlag; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; import java.nio.charset.StandardCharsets; import java.time.format.DateTimeFormatter; import java.time.Instant; import java.time.ZoneId; import java.util.EnumSet; import java.util.UUID; // Notes: // - need to uncomment the test you want to run in main method // - can execute as stand-alone class file if you add Hadoop dependencies to runtime classpath public class HDFSAppender { private static final String HDFS_URI = "hdfs://%s:8020"; private static final String HDFS_PATH = "/tmp/testfile-" + UUID.randomUUID().toString(); private static final int DATA_SIZE_KB = 100; private static final int BUFFER_SIZE_KB = 25; private static final int SLEEP_INTERVAL_SECONDS = 20; private static final short REPLICATION = (short) 1; private static final long BLOCK_SIZE = 256L * 1024 * 1024; private static final DateTimeFormatter TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.of("UTC")); private static long totalSize = 0; public static void main(String[] args) { try { URI hdfsURI = new URI(String.format(HDFS_URI, InetAddress.getLocalHost().getHostName())); Configuration conf = new Configuration(); DFSClient client = new DFSClient(hdfsURI, conf); // Uncomment the test you want to run // test1(client); // test2(client); // test3(client); // test4(client); } catch (Exception e) { e.printStackTrace(); } } // Test#1: Create Block & Repeatedly Append -> Decommission Datanode private static void test1(DFSClient client) throws Exception { DFSOutputStream outStream = createFile(client); while (true) { appendData(outStream); Thread.sleep(SLEEP_INTERVAL_SECONDS * 1000); } } // Test#2: Create Block & Repeatedly Append -> Close DFSOutputStream -> Decommission Datanode private static void test2(DFSClient client) throws Exception { DFSOutputStream outStream = createFile(client); appendData(outStream); appendData(outStream); appendData(outStream); outStream.close(); } // Test#3: Create Block & Repeatedly Append -> Close DFSOutputStream -> Re-open Block & Repeatedly Append in Loop -> Decommission Datanode private static void test3(DFSClient client) throws Exception { // Create DFSOutputStream createStream = createFile(client); appendData(createStream); appendData(createStream); appendData(createStream); createStream.close(); // Append DFSOutputStream outStream = openFileForAppend(client); while (true) { appendData(outStream); Thread.sleep(SLEEP_INTERVAL_SECONDS * 1000); } } // Test#4: Create Block & Repeatedly Append -> Close DFSOutputStream -> Re-open Block & Repeatedly Append -> Close DFSOutputStream -> Decommission Datanode private static void test4(DFSClient client) throws Exception { // Create DFSOutputStream createStream = createFile(client); appendData(createStream); appendData(createStream); appendData(createStream); createStream.close(); // Append DFSOutputStream outStream = openFileForAppend(client); appendData(outStream); appendData(outStream); appendData(outStream); outStream.close(); } private static DFSOutputStream createFile(DFSClient client) throws Exception { return (DFSOutputStream) client.create(HDFS_PATH, true, REPLICATION, BLOCK_SIZE); } private static DFSOutputStream openFileForAppend(DFSClient client) throws Exception { return (DFSOutputStream) ((HdfsDataOutputStream) client.append(HDFS_PATH, BUFFER_SIZE_KB * 1024, EnumSet.of(CreateFlag.APPEND), null, null)).getWrappedStream(); } private static void appendData(DFSOutputStream outStream) { try { String data = generateData(DATA_SIZE_KB * 1024); outStream.write(data.getBytes(StandardCharsets.UTF_8)); outStream.hflush(); totalSize += DATA_SIZE_KB; logLine(String.format("Append Success %d KB", totalSize)); } catch (Exception e) { logLine(String.format("Append Failed %d KB", totalSize)); e.printStackTrace(); } } private static String generateData(int size) { StringBuilder sb = new StringBuilder(); while (sb.length() < size) { sb.append("A"); } return sb.toString(); } private static void logLine(String line) { Instant now = Instant.now(); System.out.println(TIME_FORMAT.format(now) + " " + line); } } {code} was (Author: kevinwikant): h2. Test Methodology * Create a new HDFS cluster (with 0 blocks to begin with) * Run the test script to execute the desired test case * Check the namenode logs to identify which datanode(s) have the block replica(s) * Start decommissioning all the datanode(s) with block replica(s) * Check the namenode logs to check if the block replica(s) are considered by the DatanodeAdminManager & are replicated to other datanode(s) before the containing datanode(s) become decommissioned: ** For completed/finalized blocks, these are considered by DatanodeAdminManager & are replicated to other datanodes. ** For open blocks, these are not considered by DatanodeAdminManager & are not replicated to other datanodes. * Terminate the decommissioned datanode(s). Then validate if there was any HDFS data loss ** For completed/finalized blocks, they were replicated to other live datanodes & are therefore still accessible. ** For open blocks, they were not replicated to other live datanodes & were therefore lost when the decommissioned datanodes were terminated. Its worth noting here that the lost data includes: * any data the open DFSOutputStream has already appended. * any data from before the DFSOutputStream opened the existing block (i.e. data that was previously committed by another DFSOutputStream that has been closed already) Also, there are several additional DEBUG logs I have included for each test which help demonstrate what is the root cause of the issue. h2. Test Script {code:java} import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.fs.CreateFlag; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; import java.nio.charset.StandardCharsets; import java.time.format.DateTimeFormatter; import java.time.Instant; import java.time.ZoneId; import java.util.EnumSet; import java.util.UUID; // Notes: // - need to uncomment the test you want to run in main method // - can execute as stand-alone class file if you add Hadoop dependencies to runtime classpath public class HDFSAppender { private static final String HDFS_URI = "hdfs://%s:8020"; private static final String HDFS_PATH = "/tmp/testfile-" + UUID.randomUUID().toString(); private static final int DATA_SIZE_KB = 100; private static final int BUFFER_SIZE_KB = 25; private static final int SLEEP_INTERVAL_SECONDS = 20; private static final short REPLICATION = (short) 1; private static final long BLOCK_SIZE = 256L * 1024 * 1024; private static final DateTimeFormatter TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.of("UTC")); private static long totalSize = 0; public static void main(String[] args) { try { URI hdfsURI = new URI(String.format(HDFS_URI, InetAddress.getLocalHost().getHostName())); Configuration conf = new Configuration(); DFSClient client = new DFSClient(hdfsURI, conf); // Uncomment the test you want to run // test1(client); // test2(client); // test3(client); // test4(client); } catch (Exception e) { e.printStackTrace(); } } // Test#1: Create Block & Repeatedly Append -> Decommission Datanode private static void test1(DFSClient client) throws Exception { DFSOutputStream outStream = createFile(client); while (true) { appendData(outStream); Thread.sleep(SLEEP_INTERVAL_SECONDS * 1000); } } // Test#2: Create Block & Repeatedly Append -> Close DFSOutputStream -> Decommission Datanode private static void test2(DFSClient client) throws Exception { DFSOutputStream outStream = createFile(client); appendData(outStream); appendData(outStream); appendData(outStream); outStream.close(); } // Test#3: Create Block & Repeatedly Append -> Close DFSOutputStream -> Re-open Block & Repeatedly Append in Loop -> Decommission Datanode private static void test3(DFSClient client) throws Exception { // Create DFSOutputStream createStream = createFile(client); appendData(createStream); appendData(createStream); appendData(createStream); createStream.close(); // Append DFSOutputStream outStream = openFileForAppend(client); while (true) { appendData(outStream); Thread.sleep(SLEEP_INTERVAL_SECONDS * 1000); } } // Test#4: Create Block & Repeatedly Append -> Close DFSOutputStream -> Re-open Block & Repeatedly Append -> Close DFSOutputStream -> Decommission Datanode private static void test4(DFSClient client) throws Exception { // Create DFSOutputStream createStream = createFile(client); appendData(createStream); appendData(createStream); appendData(createStream); createStream.close(); // Append DFSOutputStream outStream = openFileForAppend(client); appendData(outStream); appendData(outStream); appendData(outStream); outStream.close(); } private static DFSOutputStream createFile(DFSClient client) throws Exception { return (DFSOutputStream) client.create(HDFS_PATH, true, REPLICATION, BLOCK_SIZE); } private static DFSOutputStream openFileForAppend(DFSClient client) throws Exception { return (DFSOutputStream) ((HdfsDataOutputStream) client.append(HDFS_PATH, BUFFER_SIZE_KB * 1024, EnumSet.of(CreateFlag.APPEND), null, null)).getWrappedStream(); } private static void appendData(DFSOutputStream outStream) { try { String data = generateData(DATA_SIZE_KB * 1024); outStream.write(data.getBytes(StandardCharsets.UTF_8)); outStream.hflush(); totalSize += DATA_SIZE_KB; logLine(String.format("Append Success %d KB", totalSize)); } catch (Exception e) { logLine(String.format("Append Failed %d KB", totalSize)); e.printStackTrace(); } } private static String generateData(int size) { StringBuilder sb = new StringBuilder(); while (sb.length() < size) { sb.append("A"); } return sb.toString(); } private static void logLine(String line) { Instant now = Instant.now(); System.out.println(TIME_FORMAT.format(now) + " " + line); } } {code} > HDFS decommissioning does not consider if Under Construction blocks are > sufficiently replicated which causes HDFS Data Loss > --------------------------------------------------------------------------------------------------------------------------- > > Key: HDFS-17658 > URL: https://issues.apache.org/jira/browse/HDFS-17658 > Project: Hadoop HDFS > Issue Type: Improvement > Affects Versions: 3.4.0 > Reporter: Kevin Wikant > Priority: Major > > h2. Background > The HDFS Namenode manages datanode decommissioning using the > DatanodeAdminManager > The DatanodeAdminManager has logic to prevent transitioning datanodes to > decommissioned state if they contain open blocks (i.e. Under Construction > blocks) which are not sufficiently replicated to other datanodes. In the > context of decomissioning, the reason that open blocks are important is > because they cannot be replicated to other datanodes given they are actively > being appended by an HDFS client. Because open blocks cannot be moved during > decommissioning, they should prevent the associated datanodes from becoming > decommissioned until the block is completed/finalized and it can be safely > moved to other live datanode(s). > > h2. Problem > The logic for DatanodeAdminManager to avoid decommissioning datanodes which > contain open blocks does not properly consider blocks which are in Under > Construction state. The DatanodeAdminManager is only considering blocks which > are in committed/finalized state. For reference: > * a block which is actively being appended by a DFSOutputStream is in Under > Construction state > * only when a DFSOutputStream is closed does the block transition from Under > Construction state to Committed/Finalized state > * then later when the block is reported to the namenode in a block report, > it will transition from Committed to Completed state > Furthermore: > * this is true for a new file/block that was just created via > "DFSClient.create" > * this is true for an existing file/block that was just opened via > "DFSClient.append" > * this is true for all different dfs.replication factor values > > h2. Impact > In some environments, a datanode being decommissioned is taken as a signal > that all the blocks on that datanodes are sufficiently replicated to other > live datanodes & therefore it is safe to terminate the underlying virtual > host running the datanode. > These types of environments can be impacted by HDFS data loss for open blocks > which are not considered in the datanode decommissioning process. Since open > blocks are not considered in determining if a datanode can be decommissioned, > a datanode may become decommissioned before its open blocks are replicated to > other datanodes. If the decommissioned datanode is then terminated, the open > blocks will be lost. > For open blocks with replication of 1, it takes a single > decommissioned/terminated datanode to cause data loss. For open blocks with > greater replication, all the datanodes holding the open block must be > decommissioned/terminated for there to be data loss. > I would also break the impact down into 2 cases: > * for a new HDFS block that has never been closed, arguably if the > DFSOutputStream encounters failure then the client should be able to replay > the data from source > * for an existing HDFS block that has previously been closed, when this > block is opened via a new DFSOutputStream the block is susceptible to being > lost & the client cannot replay data which was appended in the past by a > different client. This is arguably the worse case of impact. > > h2. Testing > This behaviour has been verified via testing on Hadoop 3.4.0; however, I > suspect it also applies to many other older/newer Hadoop versions. > See JIRA comments for detailed test methodology & results. > The following is a summary of the test cases & test results: > {quote}*Test#1: Create Block & Repeatedly Append in Loop → Decommission > Datanode* > ^ Expectation: block is considered during decommissioning; datanode is not > decommissioned until the write operation is finished & block is replicated to > another datanode. > ^ Observation: block is not considered during decommissioning & is lost when > decommissioned data is terminated. > *Test#2: Create Block & Repeatedly Append in Loop → Close DFSOutputStream → > Decommission Datanode* > ^ Expectation: block is considered during decommissioning & is replicated to > another datanode as part of decommissioning. > ^ Observation: block is considered during decommissioning & is replicated to > another datanode as part of decommissioning. > *Test#3: Create Block & Repeatedly Append in Loop → Close DFSOutputStream → > Re-open Block & Repeatedly Append in Loop → Decommission Datanode* > ^ Expectation: block is considered during decommissioning; datanode is not > decommissioned until the write operation is finished & block is replicated to > another datanode. > ^ Observation: block is not considered during decommissioning & is lost when > decommissioned data is terminated. > *Test#4: Create Block & Repeatedly Append in Loop → Close DFSOutputStream → > Re-open Block & Repeatedly Append in Loop → Close DFSOutputStream → > Decommission Datanode* > ^ Expectation: block is considered during decommissioning & is replicated to > another datanode as part of decommissioning. > ^ Observation: block is not considered during decommissioning & is lost when > decommissioned data is terminated. > {quote} > These were all tested with replication factor of 1 & 2, observation is the > same in both cases. > > h2. Root Cause Theory > The DatanodeAdminManager relies on the DatanodeDescriptor StorageInfos to > identify which blocks to consider as part of wether or not a datanode can be > decommissioned. > Based on an examination of the HDFS Namenode & Datanode DEBUG logs during > testing, I believe the root cause has to do with the following 2 behaviours: > {{*1.* When a DFSOutputStream is created for a block, that block enters Under > Construction state but the Namenode does not add Under Construction blocks to > the StorageInfos unless they have been committed/finalized which only occurs > when the DFSOutputStream is closed. Therefore, by checking the StorageInfos > only, the DatanodeAdminManager is not actually checking Under Construction > blocks.}} > During the testing, we see that the Under Construction blocks are not in the > StorageInfos for the corresponding DatanodeDescriptor: > {quote}2024-11-05 14:32:19,206 DEBUG BlockStateChange: BLOCK* block > RECEIVING_BLOCK: blk_1073741825_1001 is received from 172.31.93.123:9866 > ... > 2024-11-05 14:36:02,805 INFO blockmanagement.DatanodeAdminManager: Starting > decommission of 172.31.93.123:9866 > [DISK]DS-bb1d316c-a47a-4a3a-bf6e-0781945a50d1:NORMAL:172.31.93.123:9866 with > 0 blocks > 2024-11-05 14:36:02,805 INFO blockmanagement.DatanodeAdminManager: Starting > decommission of 172.31.93.123:9866 > [DISK]DS-95cfd2fa-25b0-4b20-aacf-e259201cf2eb:NORMAL:172.31.93.123:9866 with > 0 blocks > {quote} > Whereas, if the DFSOutputStream is closed, we see the block is included in > the StorageInfos: > {quote}2024-11-05 14:49:49,563 DEBUG BlockStateChange: BLOCK* block > RECEIVED_BLOCK: blk_1073741825_1001 is received from 172.31.95.159:9866 > ... > 2024-11-05 14:52:36,770 INFO blockmanagement.DatanodeAdminManager: Starting > decommission of 172.31.95.159:9866 > [DISK]DS-07f2256b-0cbc-4b5e-9c6c-9108650ee896:NORMAL:172.31.95.159:9866 with > 0 blocks > 2024-11-05 14:52:36,770 INFO blockmanagement.DatanodeAdminManager: Starting > decommission of 172.31.95.159:9866 > [DISK]DS-91919280-df53-4dac-b983-0eb12c81e4bf:NORMAL:172.31.95.159:9866 with > 1 blocks > 2024-11-05 14:52:48,231 INFO BlockStateChange: Block: blk_1073741825_1001, > Expected Replicas: 1, live replicas: 0, corrupt replicas: 0, decommissioned > replicas: 0, decommissioning replicas: 1, maintenance replicas: 0, live > entering maintenance replicas: 0, replicas on stale nodes:0, readonly > replicas: 0, excess replicas: 0, Is Open File: false, Datanodes having this > block: 172.31.95.159:9866 , Current Datanode: 172.31.95.159:9866, Is current > datanode decommissioning: true, Is current datanode entering maintenance: > false > {quote} > I think this behaviour might be related to the following code which has been > in Hadoop for the past 10+ years: > [https://github.com/apache/hadoop/blob/51ebc3c20e8ae7d4dced41cdd2f52715aea604cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java#L3708] > > > {{*2.* When a DFSOutputStream is created for an existing block, the Namenode > marks the existing block with previous generation stamp as stale & removes it > from the StorageInfos.}} > We can see the following DEBUG logs during repro testing: > {quote}2024-11-05 15:47:24,131 INFO namenode.FSNamesystem: > updatePipeline(blk_1073741825_1001, newGS=1002, newLength=307200, > newNodes=[172.31.95.208:9866], client=DFSClient_NONMAPREDUCE_-1408310900_1) > 2024-11-05 15:47:24,132 DEBUG BlockStateChange: BLOCK* Removing stale replica > ReplicaUC[[DISK]DS-1bd20290-4662-4e5e-af0b-9b644d78d4f8:NORMAL:172.31.95.208:9866|RBW] > of blk_1073741825_1001 > 2024-11-05 15:47:24,132 INFO namenode.FSNamesystem: > updatePipeline(blk_1073741825_1001 => blk_1073741825_1002) success > {quote} > Tracing the code from the logs we see where this occurs: > * > [https://github.com/apache/hadoop/blob/f7651e2f63ddba9ed1ae4052e38464f85dd445f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java#L4476] > * > [https://github.com/apache/hadoop/blob/f7651e2f63ddba9ed1ae4052e38464f85dd445f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java#L4433] > * > [https://github.com/apache/hadoop/blob/f7651e2f63ddba9ed1ae4052e38464f85dd445f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java#L202] > > > h3. Potential Solution > If possible, can consider adding Under Construction blocks to StorageInfos. > If this would have other adverse impacts, then another solution is to expose > the Under Construction blocks to the DatanodeAdminManager via another > separate data structure. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org