This is an automated email from the ASF dual-hosted git repository.
upthewaterspout pushed a commit to branch feature/transcoding_experiments
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to
refs/heads/feature/transcoding_experiments by this push:
new e953f39 Adding a value serializer that sends protobuf structs but
caches strings
e953f39 is described below
commit e953f39555a762568a43f8b53056d2de973be038
Author: Dan Smith <[email protected]>
AuthorDate: Tue May 15 17:42:49 2018 -0700
Adding a value serializer that sends protobuf structs but caches strings
This CachingProtobufStructSerializer is just like the
ProtobufStructSerializer, but metadata like field names and type ids are
cached.
---
.../src/main/proto/v1/string_caching_struct.proto | 39 +++++
.../CachingProtobufStructSerializer.java | 174 +++++++++++++++++++++
...he.geode.protocol.serialization.ValueSerializer | 3 +-
.../serialization/CachingStructSerializerTest.java | 74 +++++++++
4 files changed, 289 insertions(+), 1 deletion(-)
diff --git
a/geode-protobuf-messages/src/main/proto/v1/string_caching_struct.proto
b/geode-protobuf-messages/src/main/proto/v1/string_caching_struct.proto
new file mode 100644
index 0000000..fd0fff2
--- /dev/null
+++ b/geode-protobuf-messages/src/main/proto/v1/string_caching_struct.proto
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+syntax = "proto3";
+
+option java_multiple_files = true;
+
+package org.apache.geode.internal.protocol.protobuf.v1;
+
+import "v1/basicTypes.proto";
+
+message CachedString {
+ int32 id = 1;
+ string value = 2;
+}
+
+message CachingStruct {
+ CachedString typeName = 1;
+ repeated Field fields = 2;
+}
+
+message Field {
+ CachedString fieldName = 1;
+ oneof value {
+ EncodedValue encodedValue = 2;
+ CachingStruct structValue = 3;
+ }
+}
diff --git
a/geode-protobuf/src/main/java/org/apache/geode/protocol/serialization/CachingProtobufStructSerializer.java
b/geode-protobuf/src/main/java/org/apache/geode/protocol/serialization/CachingProtobufStructSerializer.java
new file mode 100644
index 0000000..0249848
--- /dev/null
+++
b/geode-protobuf/src/main/java/org/apache/geode/protocol/serialization/CachingProtobufStructSerializer.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.protocol.serialization;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.NullValue;
+import com.google.protobuf.UnsafeByteOperations;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.v1.CachedString;
+import org.apache.geode.internal.protocol.protobuf.v1.CachingStruct;
+import org.apache.geode.internal.protocol.protobuf.v1.Field;
+import
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+
+public class CachingProtobufStructSerializer implements ValueSerializer {
+ static final String PROTOBUF_STRUCT = "__PROTOBUF_STRUCT_AS_PDX";
+ private Cache cache;
+ private final Map<String, CachedString> writeCache = new HashMap<>();
+ private final Map<Integer, String> readCache = new HashMap<>();
+
+ @Override
+ public ByteString serialize(Object object) throws IOException {
+ return serializeStruct(object).toByteString();
+ }
+
+ CachingStruct serializeStruct(Object object) {
+
+ PdxInstance pdxInstance = (PdxInstance) object;
+
+ CachingStruct.Builder structBuilder = CachingStruct.newBuilder();
+ for (String fieldName : pdxInstance.getFieldNames()) {
+ Object value = pdxInstance.getField(fieldName);
+ Field serialized = serializeField(fieldName, value);
+ structBuilder.addFields(serialized);
+ }
+
+ structBuilder.setTypeName(cacheWrite(pdxInstance.getClassName()));
+
+ return structBuilder.build();
+ }
+
+ private Field serializeField(String fieldName, Object value) {
+ Field.Builder builder = Field.newBuilder();
+ builder.setFieldName(cacheWrite(fieldName));
+ if (value instanceof String) {
+ builder.setEncodedValue(
+ BasicTypes.EncodedValue.newBuilder().setStringResult((String)
value).build());
+ } else if (value instanceof Boolean) {
+ builder.setEncodedValue(
+ BasicTypes.EncodedValue.newBuilder().setBooleanResult((Boolean)
value).build());
+ } else if (value instanceof Integer) {
+ builder.setEncodedValue(
+ BasicTypes.EncodedValue.newBuilder().setIntResult((Integer)
value).build());
+ } else if (value instanceof Byte) {
+ builder.setEncodedValue(
+ BasicTypes.EncodedValue.newBuilder().setByteResult((Byte)
value).build());
+ } else if (value instanceof Long) {
+ builder.setEncodedValue(
+ BasicTypes.EncodedValue.newBuilder().setLongResult((Long)
value).build());
+ } else if (value instanceof Double) {
+ builder.setEncodedValue(
+ BasicTypes.EncodedValue.newBuilder().setDoubleResult((Double)
value).build());
+ } else if (value instanceof byte[]) {
+ builder.setEncodedValue(BasicTypes.EncodedValue.newBuilder()
+ .setBinaryResult(UnsafeByteOperations.unsafeWrap((byte[])
value)).build());
+ } else if (value instanceof PdxInstance) {
+ builder.setStructValue(serializeStruct(value));
+ } else if (value == null) {
+ builder.setEncodedValue(
+
BasicTypes.EncodedValue.newBuilder().setNullResult(NullValue.NULL_VALUE).build());
+ } else {
+ throw new IllegalStateException(
+ "Don't know how to translate object of type " + value.getClass() +
": " + value);
+ }
+ return builder.build();
+ }
+
+ private CachedString cacheWrite(final String string) {
+ return writeCache.computeIfAbsent(string,
+ name -> CachedString.newBuilder().setId(writeCache.size() +
1).setValue(name).build());
+ }
+
+ @Override
+ public Object deserialize(ByteString bytes) throws IOException,
ClassNotFoundException {
+ CachingStruct struct = CachingStruct.parseFrom(bytes);
+ return deserialize(struct);
+ }
+
+ private Object deserialize(CachingStruct struct) {
+
+ String typeName = cacheRead(struct.getTypeName());
+ PdxInstanceFactory pdxInstanceFactory =
cache.createPdxInstanceFactory(typeName);
+
+ for (Field field : struct.getFieldsList()) {
+ String fieldName = cacheRead(field.getFieldName());
+ Object value = deserializeField(field);
+ if (value instanceof String) {
+ pdxInstanceFactory.writeString(fieldName, (String) value);
+ } else if (value instanceof Boolean) {
+ pdxInstanceFactory.writeBoolean(fieldName, (Boolean) value);
+ } else if (value instanceof Integer) {
+ pdxInstanceFactory.writeInt(fieldName, (Integer) value);
+ } else if (value instanceof Byte) {
+ pdxInstanceFactory.writeByte(fieldName, (Byte) value);
+ } else if (value instanceof Long) {
+ pdxInstanceFactory.writeLong(fieldName, (Long) value);
+ } else if (value instanceof byte[]) {
+ pdxInstanceFactory.writeByteArray(fieldName, (byte[]) value);
+ } else if (value instanceof Double) {
+ pdxInstanceFactory.writeDouble(fieldName, (Double) value);
+ } else if (value instanceof PdxInstance) {
+ pdxInstanceFactory.writeObject(fieldName, value);
+ } else if (value instanceof List) {
+ pdxInstanceFactory.writeObject(fieldName, value);
+ } else if (value == null) {
+ pdxInstanceFactory.writeObject(fieldName, null);
+ } else {
+ throw new IllegalStateException(
+ "Don't know how to translate object of type " + value.getClass() +
": " + value);
+ }
+ }
+
+ return pdxInstanceFactory.create();
+ }
+
+ private String cacheRead(final CachedString fieldName) {
+ String value = fieldName.getValue();
+ int id = fieldName.getId();
+ if (value == null) {
+ value = readCache.get(id);
+ } else if (id != 0) {
+ readCache.put(id, value);
+ }
+ return value;
+ }
+
+ private Object deserializeField(Field value) {
+ switch (value.getValueCase()) {
+ case ENCODEDVALUE:
+ return new
ProtobufSerializationService().decode(value.getEncodedValue());
+ case STRUCTVALUE:
+ return deserialize(value.getStructValue());
+ default:
+ throw new IllegalStateException(
+ "Don't know how to translate object of type " +
value.getValueCase() + ": " + value);
+ }
+ }
+
+ @Override
+ public void init(Cache cache) {
+ this.cache = cache;
+
+ }
+}
diff --git
a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.protocol.serialization.ValueSerializer
b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.protocol.serialization.ValueSerializer
index 5d487dd..0866211 100644
---
a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.protocol.serialization.ValueSerializer
+++
b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.protocol.serialization.ValueSerializer
@@ -1,3 +1,4 @@
org.apache.geode.protocol.serialization.ProtobufStructSerializer
org.apache.geode.protocol.serialization.CompressingProtobufStructSerializer
-org.apache.geode.protocol.serialization.PdxPassThroughSerializer
\ No newline at end of file
+org.apache.geode.protocol.serialization.PdxPassThroughSerializer
+org.apache.geode.protocol.serialization.CachingProtobufStructSerializer
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/protocol/serialization/CachingStructSerializerTest.java
b/geode-protobuf/src/test/java/org/apache/geode/protocol/serialization/CachingStructSerializerTest.java
new file mode 100644
index 0000000..05b5342
--- /dev/null
+++
b/geode-protobuf/src/test/java/org/apache/geode/protocol/serialization/CachingStructSerializerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.protocol.serialization;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import com.google.protobuf.ByteString;
+import com.pholser.junit.quickcheck.From;
+import com.pholser.junit.quickcheck.Property;
+import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@RunWith(JUnitQuickcheck.class)
+@Category(IntegrationTest.class)
+public class CachingStructSerializerTest {
+
+ private CachingProtobufStructSerializer serializer;
+ private static Cache cache;
+
+ @BeforeClass
+ public static void createCache() {
+ cache = new CacheFactory().set(ConfigurationProperties.LOG_LEVEL, "error")
+ .setPdxReadSerialized(true).create();
+ }
+
+ @Before
+ public void createSerializer() {
+ serializer = new CachingProtobufStructSerializer();
+ serializer.init(cache);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cache.close();
+ }
+
+
+ @Property(trials = 10)
+ public void testSymmetry(
+ @PdxInstanceGenerator.ClassName("someclass")
@PdxInstanceGenerator.FieldTypes({String.class,
+ int.class, long.class, byte.class, byte[].class, double.class,
+ PdxInstance.class}) @From(PdxInstanceGenerator.class) PdxInstance
original)
+ throws IOException, ClassNotFoundException {
+ ByteString bytes = serializer.serialize(original);
+ PdxInstance actual = (PdxInstance) serializer.deserialize(bytes);
+ assertThat(original).isEqualTo(actual);
+ assertEquals(actual, original);
+ }
+}
--
To stop receiving notification emails like this one, please contact
[email protected].