This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new 3f3f62f ORC-697: Improve scan tool to report the location of
corruption
3f3f62f is described below
commit 3f3f62f721efa1c139be9bcfc05fa3d70fe4ed14
Author: Owen O'Malley <[email protected]>
AuthorDate: Tue Dec 15 06:42:48 2020 +0000
ORC-697: Improve scan tool to report the location of corruption
### What changes were proposed in this pull request?
This PR updates the scan tool to print information about where the file is
corrupted. It
* reads data by batches until there is a problem
* tries re-reading that batch column by column to find which column is
corrupted
* figures out the next location that the reader can seek to
### Why are the changes needed?
It helps diagnose where (row & column) an ORC file is corrupted.
### How was this patch tested?
It was tested on ORC files that were corrupted by bad machines.
---
.../java/org/apache/orc/impl/RecordReaderImpl.java | 6 +-
.../src/java/org/apache/orc/tools/ScanData.java | 182 ++++++++++++++++++---
2 files changed, 161 insertions(+), 27 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index e502254..781e13c 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -266,10 +266,12 @@ public class RecordReaderImpl implements RecordReader {
try {
advanceToNextRow(reader, 0L, true);
- } catch (IOException e) {
+ } catch (Exception e) {
// Try to close since this happens in constructor.
close();
- throw e;
+ long stripeId = stripes.size() == 0 ? 0 : stripes.get(0).getStripeId();
+ throw new IOException(String.format("Problem opening stripe %d footer in
%s.",
+ stripeId, path), e);
}
}
diff --git a/java/tools/src/java/org/apache/orc/tools/ScanData.java
b/java/tools/src/java/org/apache/orc/tools/ScanData.java
index e614c9f..35fcb9d 100644
--- a/java/tools/src/java/org/apache/orc/tools/ScanData.java
+++ b/java/tools/src/java/org/apache/orc/tools/ScanData.java
@@ -18,7 +18,7 @@
package org.apache.orc.tools;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
@@ -26,9 +26,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
-import org.codehaus.jettison.json.JSONException;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -40,41 +40,173 @@ public class ScanData {
static CommandLine parseCommandLine(String[] args) throws ParseException {
Options options = new Options()
- .addOption("help", "h", false, "Provide help");
- return new GnuParser().parse(options, args);
+ .addOption("s", "schema", false, "Print schema")
+ .addOption("h", "help", false, "Provide help");
+ return new DefaultParser().parse(options, args);
}
+ static int calculateBestVectorSize(int indexStride) {
+ if (indexStride == 0) {
+ return 1024;
+ }
+ // how many 1024 batches do we have in an index stride?
+ int batchCount = (indexStride + 1023) / 1024;
+ return indexStride / batchCount;
+ }
+
+ static class LocationInfo {
+ final long firstRow;
+ final long followingRow;
+ final int stripeId;
+ final long row;
+
+ LocationInfo(long firstRow, long followingRow, int stripeId,
+ long row) {
+ this.firstRow = firstRow;
+ this.followingRow = followingRow;
+ this.stripeId = stripeId;
+ this.row = row;
+ }
+
+ public String toString() {
+ return String.format("row %d in stripe %d (rows %d-%d)",
+ row, stripeId, firstRow, followingRow);
+ }
+ }
+
+ /**
+ * Given a row, find the stripe that contains that row.
+ * @param reader the file reader
+ * @param row the global row number in the file
+ * @return the information about that row in the file
+ */
+ static LocationInfo findStripeInfo(Reader reader, long row) {
+ long firstRow = 0;
+ int stripeId = 0;
+ for (StripeInformation stripe: reader.getStripes()) {
+ stripeId += 1;
+ long lastRow = firstRow + stripe.getNumberOfRows();
+ if (firstRow <= row && row < lastRow) {
+ return new LocationInfo(firstRow, lastRow, stripeId, row);
+ }
+ firstRow = lastRow;
+ }
+ return new LocationInfo(reader.getNumberOfRows(),
+ reader.getNumberOfRows(), -1, row);
+ }
- static void main(Configuration conf, String[] args
- ) throws IOException, JSONException, ParseException {
+ /**
+ * Given a failure point, find the first place that the ORC reader can
+ * recover.
+ * @param reader the ORC reader
+ * @param current the position of the failure
+ * @param batchSize the size of the batch that we tried to read
+ * @return the location that we should recover to
+ */
+ static LocationInfo findRecoveryPoint(Reader reader, LocationInfo current,
+ int batchSize) {
+ int stride = reader.getRowIndexStride();
+ long result;
+ // In the worst case, just move to the next stripe
+ if (stride == 0 ||
+ current.row + batchSize >= current.followingRow) {
+ result = current.followingRow;
+ } else {
+ long rowInStripe = current.row + batchSize - current.firstRow;
+ result = current.firstRow + (rowInStripe + stride - 1) / stride * stride;
+ }
+ return findStripeInfo(reader, result);
+ }
+
+ static boolean findBadColumns(Reader reader, LocationInfo current, int
batchSize,
+ TypeDescription column, boolean[] include) {
+ include[column.getId()] = true;
+ TypeDescription schema = reader.getSchema();
+ boolean result = false;
+ if (column.getChildren() == null) {
+ int row = 0;
+ try (RecordReader rows = reader.rows(reader.options().include(include)))
{
+ rows.seekToRow(current.row);
+ VectorizedRowBatch batch = schema.createRowBatch(
+ TypeDescription.RowBatchVersion.USE_DECIMAL64, 1);
+ for(row=0; row < batchSize; ++row) {
+ rows.nextBatch(batch);
+ }
+ } catch (Throwable t) {
+ System.out.printf("Column %d failed at row %d%n", column.getId(),
+ current.row + row);
+ result = true;
+ }
+ } else {
+ for(TypeDescription child: column.getChildren()) {
+ result |= findBadColumns(reader, current, batchSize, child, include);
+ }
+ }
+ include[column.getId()] = false;
+ return result;
+ }
+
+ static void main(Configuration conf, String[] args) throws ParseException {
CommandLine cli = parseCommandLine(args);
if (cli.hasOption('h') || cli.getArgs().length == 0) {
- System.err.println("usage: java -jar orc-tools-*.jar scan [--help] <orc
file>*");
+ System.err.println("usage: java -jar orc-tools-*.jar scan [--schema]
[--help] <orc file>*");
System.exit(1);
} else {
+ boolean printSchema = cli.hasOption('s');
List<String> badFiles = new ArrayList<>();
for (String file : cli.getArgs()) {
- try {
- Path path = new Path(file);
- Reader reader = FileDump.getReader(path, conf, badFiles);
- if (reader == null) {
- continue;
- }
- RecordReader rows = reader.rows();
- VectorizedRowBatch batch = reader.getSchema().createRowBatch();
- long batchCount = 0;
- long rowCount = 0;
- while (rows.nextBatch(batch)) {
- batchCount += 1;
- rowCount += batch.size;
+ try (Reader reader = FileDump.getReader(new Path(file), conf,
badFiles)) {
+ if (reader != null) {
+ TypeDescription schema = reader.getSchema();
+ if (printSchema) {
+ System.out.println(schema.toJson());
+ }
+ VectorizedRowBatch batch = schema.createRowBatch(
+ TypeDescription.RowBatchVersion.USE_DECIMAL64,
+ calculateBestVectorSize(reader.getRowIndexStride()));
+ final int batchSize = batch.getMaxSize();
+ long badBatches = 0;
+ long currentRow = 0;
+ long goodRows = 0;
+ try (RecordReader rows = reader.rows()) {
+ while (currentRow < reader.getNumberOfRows()) {
+ currentRow = rows.getRowNumber();
+ try {
+ if (!rows.nextBatch(batch)) {
+ break;
+ }
+ goodRows += batch.size;
+ } catch (Exception e) {
+ badBatches += 1;
+ LocationInfo current = findStripeInfo(reader, currentRow);
+ LocationInfo recover = findRecoveryPoint(reader, current,
batchSize);
+ System.out.println("Unable to read batch at " + current +
+ ", recovery at " + recover);
+ e.printStackTrace();
+ findBadColumns(reader, current, batchSize,
reader.getSchema(),
+ new boolean[reader.getSchema().getMaximumId() + 1]);
+ // There are no more rows, so break the loop
+ if (recover.stripeId == -1) {
+ break;
+ } else {
+ rows.seekToRow(recover.row);
+ }
+ }
+ }
+ }
+ if (badBatches != 0) {
+ badFiles.add(file);
+ }
+ System.out.printf("File: %s, bad batches: %d, rows: %d/%d%n", file,
+ badBatches, goodRows, reader.getNumberOfRows());
}
- System.out.println("File " + path + ": " + batchCount +
- " batches, and " + rowCount + " rows.");
} catch (Exception e) {
- System.err.println("Unable to dump data for file: " + file);
- continue;
+ badFiles.add(file);
+ System.err.println("Unable to open file: " + file);
+ e.printStackTrace();
}
}
+ System.exit(badFiles.size());
}
}
}