Author: todd
Date: Wed Jul 13 16:12:42 2011
New Revision: 1146111
URL: http://svn.apache.org/viewvc?rev=1146111&view=rev
Log:
HADOOP-7444. Add Checksum API to verify and calculate checksums "in bulk".
Contributed by Todd Lipcon.
Added:
hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestDataChecksum.java
Modified:
hadoop/common/trunk/common/CHANGES.txt
hadoop/common/trunk/common/src/java/org/apache/hadoop/util/DataChecksum.java
Modified: hadoop/common/trunk/common/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/CHANGES.txt?rev=1146111&r1=1146110&r2=1146111&view=diff
==============================================================================
--- hadoop/common/trunk/common/CHANGES.txt (original)
+++ hadoop/common/trunk/common/CHANGES.txt Wed Jul 13 16:12:42 2011
@@ -259,6 +259,9 @@ Trunk (unreleased changes)
HADOOP-7457. Remove out-of-date Chinese language documentation.
(Jakob Homan via eli)
+ HADOOP-7444. Add Checksum API to verify and calculate checksums "in bulk"
+ (todd)
+
OPTIMIZATIONS
HADOOP-7333. Performance improvement in PureJavaCrc32. (Eric Caspole
Modified:
hadoop/common/trunk/common/src/java/org/apache/hadoop/util/DataChecksum.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/util/DataChecksum.java?rev=1146111&r1=1146110&r2=1146111&view=diff
==============================================================================
---
hadoop/common/trunk/common/src/java/org/apache/hadoop/util/DataChecksum.java
(original)
+++
hadoop/common/trunk/common/src/java/org/apache/hadoop/util/DataChecksum.java
Wed Jul 13 16:12:42 2011
@@ -21,10 +21,12 @@ package org.apache.hadoop.util;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ChecksumException;
/**
* This class provides inteface and utilities for processing checksums for
@@ -234,6 +236,157 @@ public class DataChecksum implements Che
}
/**
+ * Verify that the given checksums match the given data.
+ *
+ * The 'mark' of the ByteBuffer parameters may be modified by this function,.
+ * but the position is maintained.
+ *
+ * @param data the DirectByteBuffer pointing to the data to verify.
+ * @param checksums the DirectByteBuffer pointing to a series of stored
+ * checksums
+ * @param fileName the name of the file being read, for error-reporting
+ * @param basePos the file position to which the start of 'data' corresponds
+ * @throws ChecksumException if the checksums do not match
+ */
+ public void verifyChunkedSums(ByteBuffer data, ByteBuffer checksums,
+ String fileName, long basePos)
+ throws ChecksumException {
+ if (size == 0) return;
+
+ if (data.hasArray() && checksums.hasArray()) {
+ verifyChunkedSums(
+ data.array(), data.arrayOffset() + data.position(), data.remaining(),
+ checksums.array(), checksums.arrayOffset() + checksums.position(),
+ fileName, basePos);
+ return;
+ }
+
+ int startDataPos = data.position();
+ data.mark();
+ checksums.mark();
+ try {
+ byte[] buf = new byte[bytesPerChecksum];
+ byte[] sum = new byte[size];
+ while (data.remaining() > 0) {
+ int n = Math.min(data.remaining(), bytesPerChecksum);
+ checksums.get(sum);
+ data.get(buf, 0, n);
+ summer.reset();
+ summer.update(buf, 0, n);
+ int calculated = (int)summer.getValue();
+ int stored = (sum[0] << 24 & 0xff000000) |
+ (sum[1] << 16 & 0xff0000) |
+ (sum[2] << 8 & 0xff00) |
+ sum[3] & 0xff;
+ if (calculated != stored) {
+ long errPos = basePos + data.position() - startDataPos - n;
+ throw new ChecksumException(
+ "Checksum error: "+ fileName + " at "+ errPos +
+ " exp: " + stored + " got: " + calculated, errPos);
+ }
+ }
+ } finally {
+ data.reset();
+ checksums.reset();
+ }
+ }
+
+ /**
+ * Implementation of chunked verification specifically on byte arrays. This
+ * is to avoid the copy when dealing with ByteBuffers that have array
backing.
+ */
+ private void verifyChunkedSums(
+ byte[] data, int dataOff, int dataLen,
+ byte[] checksums, int checksumsOff, String fileName,
+ long basePos) throws ChecksumException {
+
+ int remaining = dataLen;
+ int dataPos = 0;
+ while (remaining > 0) {
+ int n = Math.min(remaining, bytesPerChecksum);
+
+ summer.reset();
+ summer.update(data, dataOff + dataPos, n);
+ dataPos += n;
+ remaining -= n;
+
+ int calculated = (int)summer.getValue();
+ int stored = (checksums[checksumsOff] << 24 & 0xff000000) |
+ (checksums[checksumsOff + 1] << 16 & 0xff0000) |
+ (checksums[checksumsOff + 2] << 8 & 0xff00) |
+ checksums[checksumsOff + 3] & 0xff;
+ checksumsOff += 4;
+ if (calculated != stored) {
+ long errPos = basePos + dataPos - n;
+ throw new ChecksumException(
+ "Checksum error: "+ fileName + " at "+ errPos +
+ " exp: " + stored + " got: " + calculated, errPos);
+ }
+ }
+ }
+
+ /**
+ * Calculate checksums for the given data.
+ *
+ * The 'mark' of the ByteBuffer parameters may be modified by this function,
+ * but the position is maintained.
+ *
+ * @param data the DirectByteBuffer pointing to the data to checksum.
+ * @param checksums the DirectByteBuffer into which checksums will be
+ * stored. Enough space must be available in this
+ * buffer to put the checksums.
+ */
+ public void calculateChunkedSums(ByteBuffer data, ByteBuffer checksums) {
+ if (size == 0) return;
+
+ if (data.hasArray() && checksums.hasArray()) {
+ calculateChunkedSums(data.array(), data.arrayOffset() + data.position(),
data.remaining(),
+ checksums.array(), checksums.arrayOffset() + checksums.position());
+ return;
+ }
+
+ data.mark();
+ checksums.mark();
+ try {
+ byte[] buf = new byte[bytesPerChecksum];
+ while (data.remaining() > 0) {
+ int n = Math.min(data.remaining(), bytesPerChecksum);
+ data.get(buf, 0, n);
+ summer.reset();
+ summer.update(buf, 0, n);
+ checksums.putInt((int)summer.getValue());
+ }
+ } finally {
+ data.reset();
+ checksums.reset();
+ }
+ }
+
+ /**
+ * Implementation of chunked calculation specifically on byte arrays. This
+ * is to avoid the copy when dealing with ByteBuffers that have array
backing.
+ */
+ private void calculateChunkedSums(
+ byte[] data, int dataOffset, int dataLength,
+ byte[] sums, int sumsOffset) {
+
+ int remaining = dataLength;
+ while (remaining > 0) {
+ int n = Math.min(remaining, bytesPerChecksum);
+ summer.reset();
+ summer.update(data, dataOffset, n);
+ dataOffset += n;
+ remaining -= n;
+ long calculated = summer.getValue();
+ sums[sumsOffset++] = (byte) (calculated >> 24);
+ sums[sumsOffset++] = (byte) (calculated >> 16);
+ sums[sumsOffset++] = (byte) (calculated >> 8);
+ sums[sumsOffset++] = (byte) (calculated);
+ }
+ }
+
+
+ /**
* This just provides a dummy implimentation for Checksum class
* This is used when there is no checksum available or required for
* data
Added:
hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestDataChecksum.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestDataChecksum.java?rev=1146111&view=auto
==============================================================================
---
hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestDataChecksum.java
(added)
+++
hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestDataChecksum.java
Wed Jul 13 16:12:42 2011
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestDataChecksum {
+
+ // Set up buffers that have some header and trailer before the
+ // actual data or checksums, to make sure the code handles
+ // buffer.position(), limit, etc correctly.
+ private static final int SUMS_OFFSET_IN_BUFFER = 3;
+ private static final int DATA_OFFSET_IN_BUFFER = 3;
+ private static final int DATA_TRAILER_IN_BUFFER = 3;
+
+ private static final int BYTES_PER_CHUNK = 512;
+ private static final DataChecksum checksum =
+ DataChecksum.newDataChecksum(
+ DataChecksum.CHECKSUM_CRC32, BYTES_PER_CHUNK);
+
+ @Test
+ public void testBulkOps() throws Exception {
+ for (boolean useDirect : new boolean[]{false, true}) {
+ doBulkTest(1023, useDirect);
+ doBulkTest(1024, useDirect);
+ doBulkTest(1025, useDirect);
+ }
+ }
+
+ private void doBulkTest(int dataLength, boolean useDirect)
+ throws Exception {
+ System.err.println("Testing bulk checksums of length " +
+ dataLength + " with " +
+ (useDirect ? "direct" : "array-backed") + " buffers");
+ int numSums = (dataLength - 1)/checksum.getBytesPerChecksum() + 1;
+ int sumsLength = numSums * checksum.getChecksumSize();
+
+ byte data[] = new byte[dataLength +
+ DATA_OFFSET_IN_BUFFER +
+ DATA_TRAILER_IN_BUFFER];
+ new Random().nextBytes(data);
+ ByteBuffer dataBuf = ByteBuffer.wrap(
+ data, DATA_OFFSET_IN_BUFFER, dataLength);
+
+ byte checksums[] = new byte[SUMS_OFFSET_IN_BUFFER + sumsLength];
+ ByteBuffer checksumBuf = ByteBuffer.wrap(
+ checksums, SUMS_OFFSET_IN_BUFFER, sumsLength);
+
+ // Swap out for direct buffers if requested.
+ if (useDirect) {
+ dataBuf = directify(dataBuf);
+ checksumBuf = directify(checksumBuf);
+ }
+
+ // calculate real checksum, make sure it passes
+ checksum.calculateChunkedSums(dataBuf, checksumBuf);
+ checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
+
+ // Change a byte in the header and in the trailer, make sure
+ // it doesn't affect checksum result
+ corruptBufferOffset(checksumBuf, 0);
+ checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
+ corruptBufferOffset(dataBuf, 0);
+ dataBuf.limit(dataBuf.limit() + 1);
+ corruptBufferOffset(dataBuf, dataLength + DATA_OFFSET_IN_BUFFER);
+ dataBuf.limit(dataBuf.limit() - 1);
+ checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
+
+ // Make sure bad checksums fail - error at beginning of array
+ corruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER);
+ try {
+ checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
+ fail("Did not throw on bad checksums");
+ } catch (ChecksumException ce) {
+ assertEquals(0, ce.getPos());
+ }
+
+ // Make sure bad checksums fail - error at end of array
+ uncorruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER);
+ corruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER + sumsLength - 1);
+ try {
+ checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
+ fail("Did not throw on bad checksums");
+ } catch (ChecksumException ce) {
+ int expectedPos = checksum.getBytesPerChecksum() * (numSums - 1);
+ assertEquals(expectedPos, ce.getPos());
+ }
+ }
+
+ private static void corruptBufferOffset(ByteBuffer buf, int offset) {
+ buf.put(offset, (byte)(buf.get(offset) + 1));
+ }
+
+ private static void uncorruptBufferOffset(ByteBuffer buf, int offset) {
+ buf.put(offset, (byte)(buf.get(offset) - 1));
+ }
+
+ private static ByteBuffer directify(ByteBuffer dataBuf) {
+ ByteBuffer newBuf = ByteBuffer.allocateDirect(dataBuf.capacity());
+ newBuf.position(dataBuf.position());
+ newBuf.mark();
+ newBuf.put(dataBuf);
+ newBuf.reset();
+ newBuf.limit(dataBuf.limit());
+ return newBuf;
+ }
+}