Author: kturner Date: Mon Feb 11 22:31:50 2013 New Revision: 1444984 URL: http://svn.apache.org/r1444984 Log: ACCUMULO-1010 fixed bug in mutation that was preventing upgrade of accumulo 1.4 to 1.5.
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/data/ServerMutation.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java?rev=1444984&r1=1444983&r2=1444984&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java Mon Feb 11 22:31:50 2013 @@ -468,35 +468,6 @@ public class Mutation implements Writabl } private ColumnUpdate deserializeColumnUpdate(SimpleReader in) { - if (useOldDeserialize) - return oldDeserializeColumnUpdate(in); - return newDeserializeColumnUpdate(in); - } - - private ColumnUpdate oldDeserializeColumnUpdate(SimpleReader in) { - byte[] cf = oldReadBytes(in); - byte[] cq = oldReadBytes(in); - byte[] cv = oldReadBytes(in); - boolean hasts = in.readBoolean(); - long ts = in.readLong(); - boolean deleted = in.readBoolean(); - - byte[] val; - int valLen = in.readInt(); - - if (valLen < 0) { - val = values.get((-1 * valLen) - 1); - } else if (valLen == 0) { - val = EMPTY_BYTES; - } else { - val = new byte[valLen]; - in.readBytes(val); - } - - return newColumnUpdate(cf, cq, cv, hasts, ts, deleted, val); - } - - private ColumnUpdate newDeserializeColumnUpdate(SimpleReader in) { byte[] cf = readBytes(in); byte[] cq = readBytes(in); byte[] cv = readBytes(in); @@ -557,11 +528,6 @@ public class Mutation implements Writabl @Override public void readFields(DataInput in) throws IOException { - byte first = in.readByte(); - if ((first & 0x80) != 0x80) { - oldReadFields(first, in); - return; - } // Clear out cached column updates and value lengths so // that we recalculate them based on the (potentially) new @@ -569,7 +535,15 @@ public class Mutation implements Writabl updates = null; cachedValLens = -1; buffer = null; + useOldDeserialize = false; + byte first = in.readByte(); + if ((first & 0x80) != 0x80) { + oldReadFields(first, in); + useOldDeserialize = true; + return; + } + int len = WritableUtils.readVInt(in); row = new byte[len]; in.readFully(row); @@ -593,14 +567,10 @@ public class Mutation implements Writabl } } - public void oldReadFields(byte first, DataInput in) throws IOException { - // Clear out cached column updates and value lengths so - // that we recalculate them based on the (potentially) new - // data we are about to read in. - useOldDeserialize = true; - updates = null; - cachedValLens = -1; - buffer = null; + protected void droppingOldTimestamp(long ts) {} + + private void oldReadFields(byte first, DataInput in) throws IOException { + byte b = (byte)in.readByte(); byte c = (byte)in.readByte(); byte d = (byte)in.readByte(); @@ -610,27 +580,56 @@ public class Mutation implements Writabl row = new byte[len]; in.readFully(row); len = in.readInt(); - data = new byte[len]; - in.readFully(data); - entries = in.readInt(); + byte[] localData = new byte[len]; + in.readFully(localData); + int localEntries = in.readInt(); + List<byte[]> localValues; boolean valuesPresent = in.readBoolean(); if (!valuesPresent) { - values = null; + localValues = null; } else { - values = new ArrayList<byte[]>(); + localValues = new ArrayList<byte[]>(); int numValues = in.readInt(); for (int i = 0; i < numValues; i++) { len = in.readInt(); byte val[] = new byte[len]; in.readFully(val); - values.add(val); + localValues.add(val); } } - } - + + // convert data to new format + SimpleReader din = new SimpleReader(localData); + buffer = new ByteBuffer(); + for (int i = 0; i < localEntries; i++) { + byte[] cf = oldReadBytes(din); + byte[] cq = oldReadBytes(din); + byte[] cv = oldReadBytes(din); + boolean hasts = din.readBoolean(); + long ts = din.readLong(); + boolean deleted = din.readBoolean(); + + byte[] val; + int valLen = din.readInt(); + + if (valLen < 0) { + val = localValues.get((-1 * valLen) - 1); + } else if (valLen == 0) { + val = EMPTY_BYTES; + } else { + val = new byte[valLen]; + din.readBytes(val); + } + + put(cf, cq, cv, hasts, ts, deleted, val); + if (!hasts) + droppingOldTimestamp(ts); + } - + serialize(); + + } @Override public void write(DataOutput out) throws IOException { @@ -640,6 +639,7 @@ public class Mutation implements Writabl WritableUtils.writeVInt(out, row.length); out.write(row); + WritableUtils.writeVInt(out, data.length); out.write(data); WritableUtils.writeVInt(out, entries); @@ -691,7 +691,7 @@ public class Mutation implements Writabl return new TMutation(java.nio.ByteBuffer.wrap(row), java.nio.ByteBuffer.wrap(data), ByteBufferUtil.toByteBuffers(values), entries); } - public SERIALIZED_FORMAT getSerializedFormat() { + protected SERIALIZED_FORMAT getSerializedFormat() { return this.useOldDeserialize ? SERIALIZED_FORMAT.VERSION1 : SERIALIZED_FORMAT.VERSION2; } Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java?rev=1444984&r1=1444983&r2=1444984&view=diff ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java (original) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java Mon Feb 11 22:31:50 2013 @@ -444,7 +444,7 @@ public class MutationTest extends TestCa assertEquals(3, m1.size()); assertEquals(m1.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1"); assertEquals(m1.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2"); - assertEquals(m2.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, ""); + assertEquals(m1.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, ""); Text exampleRow = new Text(" 123456789 123456789 123456789 123456789 123456789"); int exampleLen = exampleRow.getLength(); @@ -473,4 +473,39 @@ public class MutationTest extends TestCa } + public void testReserialize() throws Exception { + // test reading in a new mutation from an old mutation and reserializing the new mutation... this was failing + OldMutation om = new OldMutation("r1"); + om.put("cf1", "cq1", "v1"); + om.put("cf2", "cq2", new ColumnVisibility("cv2"), "v2"); + om.putDelete("cf3", "cq3"); + StringBuilder bigVal = new StringBuilder(); + for (int i = 0; i < 100000; i++) { + bigVal.append('a'); + } + om.put("cf2", "big", bigVal); + + + Mutation m1 = convert(om); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + m1.write(dos); + dos.close(); + + Mutation m2 = new Mutation(); + + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + DataInputStream dis = new DataInputStream(bis); + m2.readFields(dis); + + assertEquals("r1", new String(m1.getRow())); + assertEquals(4, m2.getUpdates().size()); + assertEquals(4, m2.size()); + assertEquals(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1"); + assertEquals(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2"); + assertEquals(m2.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, ""); + assertEquals(m2.getUpdates().get(3), "cf2", "big", "", 0l, false, false, bigVal.toString()); + } + } Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/data/ServerMutation.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/data/ServerMutation.java?rev=1444984&r1=1444983&r2=1444984&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/data/ServerMutation.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/data/ServerMutation.java Mon Feb 11 22:31:50 2013 @@ -16,8 +16,6 @@ */ package org.apache.accumulo.server.data; -import static org.apache.accumulo.core.data.Mutation.SERIALIZED_FORMAT.VERSION2; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -45,21 +43,16 @@ public class ServerMutation extends Muta public ServerMutation() { } + protected void droppingOldTimestamp(long ts) { + this.systemTime = ts; + } + @Override public void readFields(DataInput in) throws IOException { super.readFields(in); // new format writes system time with the mutation - if (getSerializedFormat() == VERSION2) + if (getSerializedFormat() == SERIALIZED_FORMAT.VERSION2) systemTime = WritableUtils.readVLong(in); - else { - // old format stored it in the timestamp of each mutation - for (ColumnUpdate upd : getUpdates()) { - if (!upd.hasTimestamp()) { - systemTime = upd.getTimestamp(); - break; - } - } - } } @Override Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1444984&r1=1444983&r2=1444984&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Mon Feb 11 22:31:50 2013 @@ -222,8 +222,8 @@ public class DfsLogger { try { byte[] magic = LOG_FILE_HEADER_V2.getBytes(); byte[] buffer = new byte[magic.length]; - int read = file.read(buffer); - if (read == magic.length && Arrays.equals(buffer, magic)) { + file.readFully(buffer); + if (Arrays.equals(buffer, magic)) { int count = file.readInt(); for (int i = 0; i < count; i++) { String key = file.readUTF();