HBASE-14076 ResultSerialization and MutationSerialization can throw InvalidProtocolBufferException when serializing a cell larger than 64MB
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7ddae393 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7ddae393 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7ddae393 Branch: refs/heads/master Commit: 7ddae3939e38ee4a910eef63c051c9d470d32629 Parents: d1f7bcb Author: Esteban Gutierrez <[email protected]> Authored: Tue Jul 14 12:53:42 2015 -0700 Committer: Esteban Gutierrez <[email protected]> Committed: Fri Jul 17 09:23:13 2015 -0700 ---------------------------------------------------------------------- .../hbase/mapreduce/MutationSerialization.java | 6 +- .../hbase/mapreduce/ResultSerialization.java | 4 +- .../hbase/mapreduce/TestSerialization.java | 129 +++++++++++++++++++ 3 files changed, 136 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/7ddae393/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java index b15b513..4d200e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.Serialization; @@ -57,7 +57,9 @@ public class MutationSerialization implements Serialization<Mutation> { @Override public Mutation deserialize(Mutation mutation) throws IOException { - MutationProto proto = MutationProto.parseDelimitedFrom(in); + ClientProtos.MutationProto.Builder builder = ClientProtos.MutationProto.newBuilder(); + ProtobufUtil.mergeDelimitedFrom(builder, in); + ClientProtos.MutationProto proto = builder.build(); return ProtobufUtil.toMutation(proto); } http://git-wip-us.apache.org/repos/asf/hbase/blob/7ddae393/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java index ebd3664..19b12c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java @@ -125,7 +125,9 @@ public class ResultSerialization extends Configured implements Serialization<Res @Override public Result deserialize(Result mutation) throws IOException { - ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in); + ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder(); + ProtobufUtil.mergeDelimitedFrom(builder, in); + ClientProtos.Result proto = builder.build(); return ProtobufUtil.toResult(proto); } http://git-wip-us.apache.org/repos/asf/hbase/blob/7ddae393/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSerialization.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSerialization.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSerialization.java new file mode 100644 index 0000000..162dd85 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSerialization.java @@ -0,0 +1,129 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + + +import com.google.protobuf.InvalidProtocolBufferException; +import junit.framework.AssertionFailedError; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.experimental.categories.Category; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; + +import org.apache.hadoop.hbase.mapreduce.MutationSerialization; +import org.apache.hadoop.hbase.mapreduce.ResultSerialization; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serializer; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +@Category({MapReduceTests.class, SmallTests.class}) +public class TestSerialization { + @Rule public TestName name = new TestName(); + private static final Log LOG = LogFactory.getLog(TestSerialization.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Configuration conf; + private static final byte [] row = Bytes.toBytes("row1"); + private static final byte [] qualifier = Bytes.toBytes("qualifier1"); + private static final byte [] family = Bytes.toBytes("family1"); + private static final byte [] value = new byte[100 * 1024 * 1024]; + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.setInt("hbase.client.keyvalue.maxsize", Integer.MAX_VALUE); + //TEST_UTIL.startMiniCluster(); + } + + @Test + public void testLargeMutation() + throws Exception { + Put put = new Put(row); + put.add(family, qualifier, value); + + MutationSerialization serialization = new MutationSerialization(); + Serializer<Mutation> serializer = serialization.getSerializer(Mutation.class); + Deserializer<Mutation> deserializer = serialization.getDeserializer(Mutation.class); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + ByteArrayInputStream is = null; + try { + serializer.open(os); + serializer.serialize(put); + os.flush(); + is = new ByteArrayInputStream(os.toByteArray()); + deserializer.open(is); + deserializer.deserialize(null); + } catch (InvalidProtocolBufferException e) { + assertTrue("Got InvalidProtocolBufferException in " + name.getMethodName(), + e.getCause() instanceof InvalidProtocolBufferException); + } catch (Exception e) { + fail("Got an invalid exception: " + e); + } + } + @Test + public void testLargeResult() + throws Exception { + Result res = Result.create(new KeyValue[] {new KeyValue(row, family, qualifier, 0L, value)}); + + ResultSerialization serialization = new ResultSerialization(); + Serializer<Result> serializer = serialization.getSerializer(Result.class); + Deserializer<Result> deserializer = serialization.getDeserializer(Result.class); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + ByteArrayInputStream is = null; + try { + serializer.open(os); + serializer.serialize(res); + os.flush(); + is = new ByteArrayInputStream(os.toByteArray()); + deserializer.open(is); + deserializer.deserialize(null); + } catch (InvalidProtocolBufferException e) { + assertTrue("Got InvalidProtocolBufferException in " + name.getMethodName(), + e.getCause() instanceof InvalidProtocolBufferException); + } catch (Exception e) { + fail("Got an invalid exception: " + e); + } + } + private static String getName() { + return "TestSerialization"; + } +}
