Stephan Warren created AVRO-2637: ------------------------------------ Summary: Avro Tool's Repair Tool Can Go Into an Infinite Loop Key: AVRO-2637 URL: https://issues.apache.org/jira/browse/AVRO-2637 Project: Apache Avro Issue Type: Bug Components: tools Affects Versions: 1.9.1 Reporter: Stephan Warren
There are certain avro files corrupt in such a way that the repair tool emarks on an infinite loop. Evidence: Two unit test are added: {code:java} @Test public void testReportCorruptLoopBlock() throws Exception { String corruptLoopFile = "/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping.avro"; String repairedLoopFile = "/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping-FIXED.avro"; String output = run(new DataFileRepairTool(), "-o", "all", corruptLoopFile, repairedLoopFile); assertTrue(output, output.contains("Number of blocks: 5 Number of corrupt blocks: 4")); } @Test public void testRepariedReportCorruptLoopBlock() throws Exception { String repairedLoopFile = "/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping-FIXED.avro"; String output = run(new DataFileRepairTool(), "-o", "report", repairedLoopFile, repairedLoopFile); assertTrue(output, output.contains("Number of blocks: 4 Number of corrupt blocks: 0")); } {code} The output look like this: {noformat} Failed to read block 0. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! ... {noformat} With the inner recover portion of the code as such: {code:java} private int innerRecover(DataFileReader<Object> fileReader, DataFileWriter<Object> fileWriter, PrintStream out, PrintStream err, boolean recoverPrior, boolean recoverAfter, Schema schema, File outfile) { int numBlocks = 0; int numCorruptBlocks = 0; int numRecords = 0; int lastNumRecords = -1; int numCorruptRecords = 0; int recordsWritten = 0; long lastPostion = -1; long position = fileReader.previousSync(); long blockSize = 0; long blockCount = 0; boolean fileWritten = false; try { while (true) { try { if (!fileReader.hasNext()) { out.println("File Summary: "); out.println(" Number of blocks: " + numBlocks + " Number of corrupt blocks: " + numCorruptBlocks); out.println(" Number of records: " + numRecords + " Number of corrupt records: " + numCorruptRecords); if (recoverAfter || recoverPrior) { out.println(" Number of records written " + recordsWritten); } out.println(); return 0; } position = fileReader.previousSync(); blockCount = fileReader.getBlockCount(); blockSize = fileReader.getBlockSize(); numRecords += blockCount; long blockRemaining = blockCount; numBlocks++; boolean lastRecordWasBad = false; long badRecordsInBlock = 0; err.println("Details Prior: numblocks: "+numBlocks + ", blockRemaining: "+ blockRemaining + ", lastRecordWasBad: " + lastRecordWasBad + ", numCorruptRecords: " + numCorruptRecords + ", badRecordsInBloc: " + badRecordsInBlock); while (blockRemaining > 0) { try { Object datum = fileReader.next(); if ((recoverPrior && numCorruptBlocks == 0) || (recoverAfter && numCorruptBlocks > 0)) { if (!fileWritten) { try { fileWriter.create(schema, outfile); fileWritten = true; } catch (Exception e) { e.printStackTrace(err); return 1; } } try { fileWriter.append(datum); recordsWritten++; } catch (Exception e) { e.printStackTrace(err); throw e; } } blockRemaining--; lastRecordWasBad = false; // err.println("Details #1: blockRemaining: "+ blockRemaining + // ", lastRecordWasBad: " + lastRecordWasBad + // ", numCorruptRecords: " + numCorruptRecords + // ", badRecordsInBloc: " + badRecordsInBlock); } catch (Exception e) { long pos = blockCount - blockRemaining; if (badRecordsInBlock == 0) { // first corrupt record numCorruptBlocks++; err.println("Corrupt block: " + numBlocks + " Records in block: " + blockCount + " uncompressed block size: " + blockSize); err.println("Corrupt record at position: " + (pos)); } else { // second bad record in block, if consecutive skip block. err.println("Corrupt record at position: " + (pos)); if (lastRecordWasBad) { // consecutive bad record err.println( "Second consecutive bad record in block: " + numBlocks + ". Skipping remainder of block. "); numCorruptRecords += blockRemaining; badRecordsInBlock += blockRemaining; try { fileReader.sync(position); } catch (Exception e2) { err.println("failed to sync to sync marker, aborting"); e2.printStackTrace(err); return 1; } break; } } blockRemaining--; lastRecordWasBad = true; numCorruptRecords++; badRecordsInBlock++; err.println("Details #2: blockRemaining: "+ blockRemaining + ", lastRecordWasBad: " + lastRecordWasBad + ", numCorruptRecords: " + numCorruptRecords + ", badRecordsInBloc: " + badRecordsInBlock); } } err.println("Details After: blockRemaining: "+ blockRemaining + ", lastRecordWasBad: " + lastRecordWasBad + ", numCorruptRecords: " + numCorruptRecords + ", badRecordsInBloc: " + badRecordsInBlock); if (badRecordsInBlock != 0) { err.println("** Number of unrecoverable records in block: " + (badRecordsInBlock)); } position = fileReader.previousSync(); } catch (Exception e) { // if(lastNumRecords == numRecords) { // if(lastPostion == position) { if(false) { position++; } else { lastNumRecords = numRecords; lastPostion = position; err.println("Failed to read block " + numBlocks + ". Unknown record " + "count in block. Skipping. Reason: " + e.getMessage()); numCorruptBlocks++; err.printf( " int numBlocks = %d;\n" + " int numCorruptBlocks = %d;\n" + " int numRecords = %d;\n" + " int numCorruptRecords = %d;\n" + " int recordsWritten = %d;\n" + " long position = %d / 0x%04x;\n" + " long blockSize = %d / 0x%04x;\n" + " long blockCount = %d / 0x%04x\n", numBlocks, numCorruptBlocks, numRecords, numCorruptRecords, recordsWritten, position, position, blockSize, blockSize, blockCount, blockCount); try { fileReader.sync(position); } catch (Exception e2) { err.println("failed to sync to sync marker, aborting"); e2.printStackTrace(err); return 1; } } } } } finally { if (fileWritten) { try { fileWriter.close(); } catch (Exception e) { e.printStackTrace(err); return 1; } } } } {code} With the above code, we can see a pattern emerge: {noformat} Failed to read block 0. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! int numBlocks = 0; int numCorruptBlocks = 1; int numRecords = 0; int numCorruptRecords = 0; int recordsWritten = 0; long position = 6504 / 0x1968; long blockSize = 0 / 0x0000; long blockCount = 0 / 0x0000 Details Prior: numblocks: 1, blockRemaining: 464, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Details Prior: numblocks: 2, blockRemaining: 464, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Details Prior: numblocks: 3, blockRemaining: 464, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! int numBlocks = 3; int numCorruptBlocks = 2; int numRecords = 1392; int numCorruptRecords = 0; int recordsWritten = 1392; long position = 248627 / 0x3cb33; long blockSize = 17611 / 0x44cb; long blockCount = 464 / 0x01d0 Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! int numBlocks = 3; int numCorruptBlocks = 3; int numRecords = 1392; int numCorruptRecords = 0; int recordsWritten = 1392; long position = 248627 / 0x3cb33; long blockSize = 17611 / 0x44cb; long blockCount = 464 / 0x01d0 Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! int numBlocks = 3; int numCorruptBlocks = 4; int numRecords = 1392; int numCorruptRecords = 0; int recordsWritten = 1392; long position = 248627 / 0x3cb33; long blockSize = 17611 / 0x44cb; long blockCount = 464 / 0x01d0 Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! int numBlocks = 3; int numCorruptBlocks = 5; int numRecords = 1392; int numCorruptRecords = 0; int recordsWritten = 1392; long position = 248627 / 0x3cb33; long blockSize = 17611 / 0x44cb; long blockCount = 464 / 0x01d0 {noformat} While it's not the probable correct acceptable fix, the following change seems to make the repair complete in the above provided code: {code:java} if(lastPostion == position) { // if(false) { {code} I cannot provide the proprietary file, but I'd be happy to address questions. -- This message was sent by Atlassian Jira (v8.3.4#803005)