Stephan Warren created AVRO-2637:
------------------------------------

             Summary: Avro Tool's Repair Tool Can Go Into an Infinite Loop
                 Key: AVRO-2637
                 URL: https://issues.apache.org/jira/browse/AVRO-2637
             Project: Apache Avro
          Issue Type: Bug
          Components: tools
    Affects Versions: 1.9.1
            Reporter: Stephan Warren


There are certain avro files corrupt in such a way that the repair tool emarks 
on an infinite loop. Evidence:

Two unit test are added:

 
{code:java}
  @Test
  public void testReportCorruptLoopBlock() throws Exception {
    String corruptLoopFile = 
"/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping.avro";
    String repairedLoopFile = 
"/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping-FIXED.avro";
    String output = run(new DataFileRepairTool(), "-o", "all", corruptLoopFile, 
repairedLoopFile);
    assertTrue(output, output.contains("Number of blocks: 5 Number of corrupt 
blocks: 4"));
  }
  @Test
  public void testRepariedReportCorruptLoopBlock() throws Exception {
    String repairedLoopFile = 
"/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping-FIXED.avro";
    String output = run(new DataFileRepairTool(), "-o", "report", 
repairedLoopFile, repairedLoopFile);
    assertTrue(output, output.contains("Number of blocks: 4 Number of corrupt 
blocks: 0"));
  }

{code}
 

The output look like this:

 

 
{noformat}
Failed to read block 0. Unknown record count in block. Skipping. Reason: 
java.io.IOException: Invalid sync!
Failed to read block 3. Unknown record count in block. Skipping. Reason: 
java.io.IOException: Invalid sync!
Failed to read block 3. Unknown record count in block. Skipping. Reason: 
java.io.IOException: Invalid sync!
Failed to read block 3. Unknown record count in block. Skipping. Reason: 
java.io.IOException: Invalid sync!
...
{noformat}
 

 

With the inner recover portion of the code as such:
{code:java}
   private int innerRecover(DataFileReader<Object> fileReader, 
DataFileWriter<Object> fileWriter, PrintStream out,
      PrintStream err, boolean recoverPrior, boolean recoverAfter, Schema 
schema, File outfile) {
    int numBlocks = 0;
    int numCorruptBlocks = 0;
    int numRecords = 0;
    int lastNumRecords = -1;
    int numCorruptRecords = 0;
    int recordsWritten = 0;
    long lastPostion = -1;
    long position = fileReader.previousSync();
    long blockSize = 0;
    long blockCount = 0;
    boolean fileWritten = false;
    try {
      while (true) {
        try {
          if (!fileReader.hasNext()) {
            out.println("File Summary: ");
            out.println("  Number of blocks: " + numBlocks + " Number of 
corrupt blocks: " + numCorruptBlocks);
            out.println("  Number of records: " + numRecords + " Number of 
corrupt records: " + numCorruptRecords);
            if (recoverAfter || recoverPrior) {
              out.println("  Number of records written " + recordsWritten);
            }
            out.println();
            return 0;
          }
          position = fileReader.previousSync();
          blockCount = fileReader.getBlockCount();
          blockSize = fileReader.getBlockSize();
          numRecords += blockCount;
          long blockRemaining = blockCount;
          numBlocks++;
          boolean lastRecordWasBad = false;
          long badRecordsInBlock = 0;
          err.println("Details Prior: numblocks: "+numBlocks +
            ", blockRemaining: "+ blockRemaining +
            ", lastRecordWasBad: " + lastRecordWasBad +
            ", numCorruptRecords: " + numCorruptRecords +
            ", badRecordsInBloc: " + badRecordsInBlock);
          while (blockRemaining > 0) {
            try {
              Object datum = fileReader.next();
              if ((recoverPrior && numCorruptBlocks == 0) || (recoverAfter && 
numCorruptBlocks > 0)) {
                if (!fileWritten) {
                  try {
                    fileWriter.create(schema, outfile);
                    fileWritten = true;
                  } catch (Exception e) {
                    e.printStackTrace(err);
                    return 1;
                  }
                }
                try {
                  fileWriter.append(datum);
                  recordsWritten++;
                } catch (Exception e) {
                  e.printStackTrace(err);
                  throw e;
                }
              }
              blockRemaining--;
              lastRecordWasBad = false;
//              err.println("Details #1: blockRemaining: "+ blockRemaining +
//                ", lastRecordWasBad: " + lastRecordWasBad +
//                ", numCorruptRecords: " + numCorruptRecords +
//                ", badRecordsInBloc: " + badRecordsInBlock);
            } catch (Exception e) {
              long pos = blockCount - blockRemaining;
              if (badRecordsInBlock == 0) {
                // first corrupt record
                numCorruptBlocks++;
                err.println("Corrupt block: " + numBlocks + " Records in block: 
" + blockCount
                    + " uncompressed block size: " + blockSize);
                err.println("Corrupt record at position: " + (pos));
              } else {
                // second bad record in block, if consecutive skip block.
                err.println("Corrupt record at position: " + (pos));
                if (lastRecordWasBad) {
                  // consecutive bad record
                  err.println(
                      "Second consecutive bad record in block: " + numBlocks + 
". Skipping remainder of block. ");
                  numCorruptRecords += blockRemaining;
                  badRecordsInBlock += blockRemaining;
                  try {
                    fileReader.sync(position);
                  } catch (Exception e2) {
                    err.println("failed to sync to sync marker, aborting");
                    e2.printStackTrace(err);
                    return 1;
                  }
                  break;
                }
              }
              blockRemaining--;
              lastRecordWasBad = true;
              numCorruptRecords++;
              badRecordsInBlock++;
              err.println("Details #2: blockRemaining: "+ blockRemaining +
                ", lastRecordWasBad: " + lastRecordWasBad +
                ", numCorruptRecords: " + numCorruptRecords +
                ", badRecordsInBloc: " + badRecordsInBlock);
            }
          }
          err.println("Details After: blockRemaining: "+ blockRemaining +
            ", lastRecordWasBad: " + lastRecordWasBad +
            ", numCorruptRecords: " + numCorruptRecords +
            ", badRecordsInBloc: " + badRecordsInBlock);

          if (badRecordsInBlock != 0) {
            err.println("** Number of unrecoverable records in block: " + 
(badRecordsInBlock));
          }
          position = fileReader.previousSync();
        } catch (Exception e) {

//          if(lastNumRecords == numRecords) {
//          if(lastPostion == position) {
          if(false) {
              position++;
          }
          else {
            lastNumRecords = numRecords;
            lastPostion = position;
            err.println("Failed to read block " + numBlocks + ". Unknown record 
" + "count in block.  Skipping. Reason: "
              + e.getMessage());

            numCorruptBlocks++;

            err.printf(
              "    int numBlocks = %d;\n" +
                "    int numCorruptBlocks = %d;\n" +
                "    int numRecords = %d;\n" +
                "    int numCorruptRecords = %d;\n" +
                "    int recordsWritten = %d;\n" +
                "    long position = %d / 0x%04x;\n" +
                "    long blockSize = %d / 0x%04x;\n" +
                "    long blockCount = %d / 0x%04x\n",
              numBlocks,
              numCorruptBlocks,
              numRecords,
              numCorruptRecords,
              recordsWritten,
              position, position,
              blockSize, blockSize,
              blockCount, blockCount);
            try {
              fileReader.sync(position);
            } catch (Exception e2) {
              err.println("failed to sync to sync marker, aborting");
              e2.printStackTrace(err);
              return 1;
            }
          }
        }
      }
    } finally {
      if (fileWritten) {
        try {
          fileWriter.close();
        } catch (Exception e) {
          e.printStackTrace(err);
          return 1;
        }
      }
    }
  }
{code}


With the above code, we can see a pattern emerge:


{noformat}
Failed to read block 0. Unknown record count in block.  Skipping. Reason: 
java.io.IOException: Invalid sync!
    int numBlocks = 0;
    int numCorruptBlocks = 1;
    int numRecords = 0;
    int numCorruptRecords = 0;
    int recordsWritten = 0;
    long position = 6504 / 0x1968;
    long blockSize = 0 / 0x0000;
    long blockCount = 0 / 0x0000
Details Prior: numblocks: 1, blockRemaining: 464, lastRecordWasBad: false, 
numCorruptRecords: 0, badRecordsInBloc: 0
Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 
0, badRecordsInBloc: 0
Details Prior: numblocks: 2, blockRemaining: 464, lastRecordWasBad: false, 
numCorruptRecords: 0, badRecordsInBloc: 0
Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 
0, badRecordsInBloc: 0
Details Prior: numblocks: 3, blockRemaining: 464, lastRecordWasBad: false, 
numCorruptRecords: 0, badRecordsInBloc: 0
Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 
0, badRecordsInBloc: 0
Failed to read block 3. Unknown record count in block.  Skipping. Reason: 
java.io.IOException: Invalid sync!
    int numBlocks = 3;
    int numCorruptBlocks = 2;
    int numRecords = 1392;
    int numCorruptRecords = 0;
    int recordsWritten = 1392;
    long position = 248627 / 0x3cb33;
    long blockSize = 17611 / 0x44cb;
    long blockCount = 464 / 0x01d0
Failed to read block 3. Unknown record count in block.  Skipping. Reason: 
java.io.IOException: Invalid sync!
    int numBlocks = 3;
    int numCorruptBlocks = 3;
    int numRecords = 1392;
    int numCorruptRecords = 0;
    int recordsWritten = 1392;
    long position = 248627 / 0x3cb33;
    long blockSize = 17611 / 0x44cb;
    long blockCount = 464 / 0x01d0
Failed to read block 3. Unknown record count in block.  Skipping. Reason: 
java.io.IOException: Invalid sync!
    int numBlocks = 3;
    int numCorruptBlocks = 4;
    int numRecords = 1392;
    int numCorruptRecords = 0;
    int recordsWritten = 1392;
    long position = 248627 / 0x3cb33;
    long blockSize = 17611 / 0x44cb;
    long blockCount = 464 / 0x01d0
Failed to read block 3. Unknown record count in block.  Skipping. Reason: 
java.io.IOException: Invalid sync!
    int numBlocks = 3;
    int numCorruptBlocks = 5;
    int numRecords = 1392;
    int numCorruptRecords = 0;
    int recordsWritten = 1392;
    long position = 248627 / 0x3cb33;
    long blockSize = 17611 / 0x44cb;
    long blockCount = 464 / 0x01d0

{noformat}

While it's not the probable correct acceptable fix, the following change seems 
to make the repair complete in the above provided code:
{code:java}
          if(lastPostion == position) {
//          if(false) {
{code}

I cannot provide the proprietary file, but I'd be happy to address questions.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to