Author: tomwhite
Date: Mon Nov 17 16:04:35 2014
New Revision: 1640179
URL: http://svn.apache.org/r1640179
Log:
AVRO-834. Java: Data File corruption recovery tool. Contributed by scottcarey
and tomwhite.
Added:
avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileRepairTool.java
avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestDataFileRepairTool.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1640179&r1=1640178&r2=1640179&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Nov 17 16:04:35 2014
@@ -29,6 +29,9 @@ Trunk (not yet released)
AVRO-570. Python: Add connector for tethered mapreduce.
(Jeremy Lewi and Steven Willis via cutting)
+ AVRO-834. Java: Data File corruption recovery tool.
+ (scottcarey and tomwhite)
+
OPTIMIZATIONS
IMPROVEMENTS
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java?rev=1640179&r1=1640178&r2=1640179&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
Mon Nov 17 16:04:35 2014
@@ -251,6 +251,9 @@ public class DataFileStream<D> implement
/** Expert: Return the count of items in the current block. */
public long getBlockCount() { return blockCount; }
+ /** Expert: Return the size in bytes (uncompressed) of the current block. */
+ public long getBlockSize() { return blockSize; }
+
protected void blockFinished() throws IOException {
// nothing for the stream impl
}
Added:
avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileRepairTool.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileRepairTool.java?rev=1640179&view=auto
==============================================================================
---
avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileRepairTool.java
(added)
+++
avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileRepairTool.java
Mon Nov 17 16:04:35 2014
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+
+/** Recovers data from a corrupt Avro Data file */
+public class DataFileRepairTool implements Tool {
+
+ @Override
+ public String getName() {
+ return "repair";
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "Recovers data from a corrupt Avro Data file";
+ }
+
+ private void printInfo(PrintStream output) {
+ output.println("Insufficient arguments. Arguments: [-o option] "
+ + "input_file output_file \n"
+ + " Where option is one of the following: \n"
+ + " " + ALL
+ + " (default) recover as many records as possible.\n"
+ + " " + PRIOR
+ + " recover only records prior to the first instance"
+ + " of corruption \n"
+ + " " + AFTER
+ + " recover only records after the first instance of"
+ + " corruption.\n"
+ + " " + REPORT
+ + " print the corruption report only, reporting the\n"
+ + " number of valid and corrupted blocks and
records\n"
+ + " input_file is the file to read from. output_file is the file
to\n"
+ + " create and write recovered data to. output_file is ignored if\n"
+ + " using the report option.");
+ }
+
+ private static final Set<String> OPTIONS = new HashSet<String>();
+ private static final String ALL = "all";
+ private static final String PRIOR = "prior";
+ private static final String AFTER = "after";
+ private static final String REPORT = "report";
+ static {
+ OPTIONS.add(ALL);
+ OPTIONS.add(PRIOR);
+ OPTIONS.add(AFTER);
+ OPTIONS.add(REPORT);
+ }
+
+ @Override
+ public int run(InputStream stdin, PrintStream out, PrintStream err,
+ List<String> args) throws Exception {
+ if (args.size() < 2) {
+ printInfo(err);
+ return 1;
+ }
+ int index = 0;
+ String input = args.get(index);
+ String option = "all";
+ if ("-o".equals(input)) {
+ option = args.get(1);
+ index += 2;
+ }
+ if (!OPTIONS.contains(option) || (args.size() - index < 1)) {
+ printInfo(err);
+ return 1;
+ }
+ input = args.get(index++);
+ if (!REPORT.equals(option)) {
+ if (args.size() - index < 1) {
+ printInfo(err);
+ return 1;
+ }
+ }
+ if (ALL.equals(option)) {
+ return recoverAll(input, args.get(index), out, err);
+ } else if (PRIOR.equals(option)) {
+ return recoverPrior(input, args.get(index), out, err);
+ } else if (AFTER.equals(option)) {
+ return recoverAfter(input, args.get(index), out, err);
+ } else if (REPORT.equals(option)) {
+ return reportOnly(input, out, err);
+ } else {
+ return 1;
+ }
+ }
+
+ private int recover(String input, String output, PrintStream out,
+ PrintStream err, boolean recoverPrior, boolean recoverAfter)
+ throws IOException {
+ File infile = new File(input);
+ if (!infile.canRead()) {
+ err.println("cannot read file: " + input);
+ return 1;
+ }
+ out.println("Recovering file: " + input);
+ GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
+ DataFileReader<Object> fileReader = new DataFileReader<Object>(infile,
+ reader);
+ try {
+ Schema schema = fileReader.getSchema();
+ String codecStr = fileReader.getMetaString(DataFileConstants.CODEC);
+ CodecFactory codecFactory = CodecFactory.fromString("" + codecStr);
+ List<String> metas = fileReader.getMetaKeys();
+ if (recoverPrior || recoverAfter) {
+ GenericDatumWriter<Object> writer = new GenericDatumWriter<Object>();
+ DataFileWriter<Object> fileWriter = new DataFileWriter<Object>(writer);
+ try {
+ File outfile = new File(output);
+ for (String key : metas) {
+ if (!key.startsWith("avro.")) {
+ byte[] val = fileReader.getMeta(key);
+ fileWriter.setMeta(key, val);
+ }
+ }
+ fileWriter.setCodec(codecFactory);
+ int result = innerRecover(fileReader, fileWriter, out, err,
recoverPrior,
+ recoverAfter, schema, outfile);
+ return result;
+ } catch (Exception e) {
+ e.printStackTrace(err);
+ return 1;
+ }
+ } else {
+ return innerRecover(fileReader, null, out, err, recoverPrior,
+ recoverAfter, null, null);
+ }
+
+ } finally {
+ fileReader.close();
+ }
+ }
+
+ private int innerRecover(DataFileReader<Object> fileReader,
+ DataFileWriter<Object> fileWriter, PrintStream out, PrintStream err,
+ boolean recoverPrior, boolean recoverAfter, Schema schema, File outfile)
{
+ int numBlocks = 0;
+ int numCorruptBlocks = 0;
+ int numRecords = 0;
+ int numCorruptRecords = 0;
+ int recordsWritten = 0;
+ long position = fileReader.previousSync();
+ long blockSize = 0;
+ long blockCount = 0;
+ boolean fileWritten = false;
+ try {
+ while (true) {
+ try {
+ if (!fileReader.hasNext()) {
+ out.println("File Summary: ");
+ out.println(" Number of blocks: " + numBlocks
+ + " Number of corrupt blocks: " + numCorruptBlocks);
+ out.println(" Number of records: " + numRecords
+ + " Number of corrupt records: " + numCorruptRecords);
+ if (recoverAfter || recoverPrior) {
+ out.println(" Number of records written " + recordsWritten);
+ }
+ out.println();
+ return 0;
+ }
+ position = fileReader.previousSync();
+ blockCount = fileReader.getBlockCount();
+ blockSize = fileReader.getBlockSize();
+ numRecords += blockCount;
+ long blockRemaining = blockCount;
+ numBlocks++;
+ boolean lastRecordWasBad = false;
+ long badRecordsInBlock = 0;
+ while (blockRemaining > 0) {
+ try {
+ Object datum = fileReader.next();
+ if ((recoverPrior && numCorruptBlocks == 0)
+ || (recoverAfter && numCorruptBlocks > 0)) {
+ if (!fileWritten) {
+ try {
+ fileWriter.create(schema, outfile);
+ fileWritten = true;
+ } catch (Exception e) {
+ e.printStackTrace(err);
+ return 1;
+ }
+ }
+ try {
+ fileWriter.append(datum);
+ recordsWritten++;
+ } catch (Exception e) {
+ e.printStackTrace(err);
+ throw e;
+ }
+ }
+ blockRemaining--;
+ lastRecordWasBad = false;
+ } catch (Exception e) {
+ long pos = blockCount - blockRemaining;
+ if (badRecordsInBlock == 0) {
+ // first corrupt record
+ numCorruptBlocks++;
+ err.println("Corrupt block: " + numBlocks + " Records in
block: "
+ + blockCount + " uncompressed block size: " + blockSize);
+ err.println("Corrupt record at position: "
+ + (pos));
+ } else {
+ // second bad record in block, if consecutive skip block.
+ err.println("Corrupt record at position: "
+ + (pos));
+ if (lastRecordWasBad) {
+ // consecutive bad record
+ err.println("Second consecutive bad record in block: " +
numBlocks
+ + ". Skipping remainder of block. ");
+ numCorruptRecords += blockRemaining;
+ badRecordsInBlock += blockRemaining;
+ try {
+ fileReader.sync(position);
+ } catch (Exception e2) {
+ err.println("failed to sync to sync marker, aborting");
+ e2.printStackTrace(err);
+ return 1;
+ }
+ break;
+ }
+ }
+ blockRemaining --;
+ lastRecordWasBad = true;
+ numCorruptRecords++;
+ badRecordsInBlock++;
+ }
+ }
+ if (badRecordsInBlock != 0) {
+ err.println("** Number of unrecoverable records in block: "
+ + (badRecordsInBlock));
+ }
+ position = fileReader.previousSync();
+ } catch (Exception e) {
+ err.println("Failed to read block " + numBlocks + ". Unknown record "
+ + "count in block. Skipping. Reason: " + e.getMessage());
+ numCorruptBlocks++;
+ try {
+ fileReader.sync(position);
+ } catch (Exception e2) {
+ err.println("failed to sync to sync marker, aborting");
+ e2.printStackTrace(err);
+ return 1;
+ }
+ }
+ }
+ } finally {
+ if (fileWritten) {
+ try {
+ fileWriter.close();
+ } catch (Exception e) {
+ e.printStackTrace(err);
+ return 1;
+ }
+ }
+ }
+ }
+
+ private int reportOnly(String input, PrintStream out, PrintStream err)
+ throws IOException {
+ return recover(input, null, out, err, false, false);
+ }
+
+ private int recoverAfter(String input, String output, PrintStream out,
+ PrintStream err) throws IOException {
+ return recover(input, output, out, err, false, true);
+ }
+
+ private int recoverPrior(String input, String output, PrintStream out,
+ PrintStream err) throws IOException {
+ return recover(input, output, out, err, true, false);
+ }
+
+ private int recoverAll(String input, String output, PrintStream out,
+ PrintStream err) throws IOException {
+ return recover(input, output, out, err, true, true);
+ }
+}
Modified:
avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java?rev=1640179&r1=1640178&r2=1640179&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java
(original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java Mon
Nov 17 16:04:35 2014
@@ -45,6 +45,7 @@ public class Main {
new DataFileWriteTool(),
new DataFileGetMetaTool(),
new DataFileGetSchemaTool(),
+ new DataFileRepairTool(),
new IdlTool(),
new IdlToSchemataTool(),
new RecodecTool(),
Added:
avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestDataFileRepairTool.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestDataFileRepairTool.java?rev=1640179&view=auto
==============================================================================
---
avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestDataFileRepairTool.java
(added)
+++
avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestDataFileRepairTool.java
Mon Nov 17 16:04:35 2014
@@ -0,0 +1,190 @@
+package org.apache.avro.tool;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import org.apache.avro.AvroTestUtil;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.util.Utf8;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestDataFileRepairTool {
+
+ private static final Schema SCHEMA = Schema.create(Schema.Type.STRING);
+ private static File corruptBlockFile;
+ private static File corruptRecordFile;
+
+ private File repairedFile;
+
+ @BeforeClass
+ public static void writeCorruptFile() throws IOException {
+ // Write a data file
+ DataFileWriter<Utf8> w = new DataFileWriter<Utf8>(new
GenericDatumWriter<Utf8>(SCHEMA));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ w.create(SCHEMA, baos);
+ w.append(new Utf8("apple"));
+ w.append(new Utf8("banana"));
+ w.append(new Utf8("celery"));
+ w.sync();
+ w.append(new Utf8("date"));
+ w.append(new Utf8("endive"));
+ w.append(new Utf8("fig"));
+ long pos = w.sync();
+ w.append(new Utf8("guava"));
+ w.append(new Utf8("hazelnut"));
+ w.close();
+
+ byte[] original = baos.toByteArray();
+
+ // Corrupt the second block by inserting some zero bytes before the sync
marker
+ int corruptPosition = (int) pos - DataFileConstants.SYNC_SIZE;
+ int corruptedBytes = 3;
+ byte[] corrupted = new byte[original.length + corruptedBytes];
+ System.arraycopy(original, 0, corrupted, 0, corruptPosition);
+ System.arraycopy(original, corruptPosition,
+ corrupted, corruptPosition + corruptedBytes, original.length -
corruptPosition);
+
+ corruptBlockFile = AvroTestUtil.tempFile(TestDataFileRepairTool.class,
+ "corruptBlock.avro");
+ corruptBlockFile.deleteOnExit();
+ FileOutputStream out = new FileOutputStream(corruptBlockFile);
+ out.write(corrupted);
+ out.close();
+
+ // Corrupt the "endive" record by changing the length of the string to be
negative
+ corruptPosition = (int) pos - DataFileConstants.SYNC_SIZE -
+ (1 + "fig".length() + 1 + "endive".length());
+ corrupted = new byte[original.length];
+ System.arraycopy(original, 0, corrupted, 0, original.length);
+ BinaryData.encodeLong(-1, corrupted, corruptPosition);
+
+ corruptRecordFile = AvroTestUtil.tempFile(TestDataFileRepairTool.class,
+ "corruptRecord.avro");
+ corruptRecordFile.deleteOnExit();
+ out = new FileOutputStream(corruptRecordFile);
+ out.write(corrupted);
+ out.close();
+ }
+
+ @Before
+ public void setUp() {
+ repairedFile = AvroTestUtil.tempFile(TestDataFileRepairTool.class,
"repaired.avro");
+ }
+
+ @After
+ public void tearDown() {
+ repairedFile.delete();
+ }
+
+ private String run(Tool tool, String... args) throws Exception {
+ return run(tool, null, args);
+ }
+
+ private String run(Tool tool, InputStream stdin, String... args) throws
Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ PrintStream stdout = new PrintStream(out);
+ tool.run(
+ stdin,
+ stdout,
+ System.err,
+ Arrays.asList(args));
+ return out.toString("UTF-8").replace("\r", "");
+ }
+
+ @Test
+ public void testReportCorruptBlock() throws Exception {
+ String output = run(new DataFileRepairTool(), "-o", "report",
corruptBlockFile.getPath());
+ assertTrue(output, output.contains("Number of blocks: 2 Number of corrupt
blocks: 1"));
+ assertTrue(output, output.contains("Number of records: 5 Number of corrupt
records: 0"));
+ }
+
+ @Test
+ public void testReportCorruptRecord() throws Exception {
+ String output = run(new DataFileRepairTool(), "-o", "report",
corruptRecordFile.getPath());
+ assertTrue(output, output.contains("Number of blocks: 3 Number of corrupt
blocks: 1"));
+ assertTrue(output, output.contains("Number of records: 8 Number of corrupt
records: 2"));
+ }
+
+ @Test
+ public void testRepairAllCorruptBlock() throws Exception {
+ String output = run(new DataFileRepairTool(), "-o", "all",
+ corruptBlockFile.getPath(), repairedFile.getPath());
+ assertTrue(output, output.contains("Number of blocks: 2 Number of corrupt
blocks: 1"));
+ assertTrue(output, output.contains("Number of records: 5 Number of corrupt
records: 0"));
+ checkFileContains(repairedFile, "apple", "banana", "celery", "guava",
"hazelnut");
+ }
+
+ @Test
+ public void testRepairAllCorruptRecord() throws Exception {
+ String output = run(new DataFileRepairTool(), "-o", "all",
+ corruptRecordFile.getPath(), repairedFile.getPath());
+ assertTrue(output, output.contains("Number of blocks: 3 Number of corrupt
blocks: 1"));
+ assertTrue(output, output.contains("Number of records: 8 Number of corrupt
records: 2"));
+ checkFileContains(repairedFile, "apple", "banana", "celery", "date",
"guava",
+ "hazelnut");
+ }
+
+ @Test
+ public void testRepairPriorCorruptBlock() throws Exception {
+ String output = run(new DataFileRepairTool(), "-o", "prior",
+ corruptBlockFile.getPath(), repairedFile.getPath());
+ assertTrue(output, output.contains("Number of blocks: 2 Number of corrupt
blocks: 1"));
+ assertTrue(output, output.contains("Number of records: 5 Number of corrupt
records: 0"));
+ checkFileContains(repairedFile, "apple", "banana", "celery");
+ }
+
+ @Test
+ public void testRepairPriorCorruptRecord() throws Exception {
+ String output = run(new DataFileRepairTool(), "-o", "prior",
+ corruptRecordFile.getPath(), repairedFile.getPath());
+ assertTrue(output, output.contains("Number of blocks: 3 Number of corrupt
blocks: 1"));
+ assertTrue(output, output.contains("Number of records: 8 Number of corrupt
records: 2"));
+ checkFileContains(repairedFile, "apple", "banana", "celery", "date");
+ }
+
+ @Test
+ public void testRepairAfterCorruptBlock() throws Exception {
+ String output = run(new DataFileRepairTool(), "-o", "after",
+ corruptBlockFile.getPath(), repairedFile.getPath());
+ assertTrue(output, output.contains("Number of blocks: 2 Number of corrupt
blocks: 1"));
+ assertTrue(output, output.contains("Number of records: 5 Number of corrupt
records: 0"));
+ checkFileContains(repairedFile, "guava", "hazelnut");
+ }
+
+ @Test
+ public void testRepairAfterCorruptRecord() throws Exception {
+ String output = run(new DataFileRepairTool(), "-o", "after",
+ corruptRecordFile.getPath(), repairedFile.getPath());
+ assertTrue(output, output.contains("Number of blocks: 3 Number of corrupt
blocks: 1"));
+ assertTrue(output, output.contains("Number of records: 8 Number of corrupt
records: 2"));
+ checkFileContains(repairedFile, "guava", "hazelnut");
+ }
+
+ private void checkFileContains(File repairedFile, String... lines) throws
IOException {
+ DataFileReader r = new DataFileReader<Utf8>(repairedFile,
+ new GenericDatumReader<Utf8>(SCHEMA));
+ for (String line : lines) {
+ assertEquals(line, r.next().toString());
+ }
+ assertFalse(r.hasNext());
+ }
+
+}