scwhittle commented on code in PR #34873:
URL: https://github.com/apache/beam/pull/34873#discussion_r2077109138
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -840,4 +843,38 @@ public boolean equals(@Nullable Object other) {
public int hashCode() {
return Objects.hash(getClass(), typeDescriptor, datumFactory,
schemaSupplier.get());
}
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
Review Comment:
Actually I think that this could still lead to duplicate reader/writers if
we deserialize the same logical coder multiple times.
I think that could be fixed by using readResolve which allows modifying the
deserialized object (just learning about this myself).
https://docs.oracle.com/javase/8/docs/platform/serialization/spec/input.html#a5903
One idea that seems clean is to change to serialize the cache key for
AvroCoder using writeReplace and then use readResolve of that key to
lookup/create the coder in the cache based upon that key. There is an example
in this file of that with SerializableSchemaString.
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -242,7 +252,18 @@ public static <T> AvroCoder<T> of(Class<T> type, Schema
schema) {
* @param <T> the element type
*/
public static <T> AvroCoder<T> of(AvroDatumFactory<T> datumFactory, Schema
schema) {
- return new AvroCoder<>(datumFactory, schema);
+ Class<T> type = datumFactory.getType();
+ return fromCacheOrCreate(type, schema, () -> new AvroCoder<>(datumFactory,
schema));
+ }
+
+ private static <T> AvroCoder<T> fromCacheOrCreate(
+ Class<T> type, Schema schema, Callable<AvroCoder<T>> avroCoderCreator) {
Review Comment:
it might be safer to have this take a type ENUM as well (specific, reflect).
Otherwise AvroCoder.specific() factory could return something previously cached
via AvroCoder.reflect() and vice versa.
That might not be expected usage but seems like we might as well guard
against that since it woudl be pretty confusing.
##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java:
##########
@@ -416,6 +417,40 @@ public void testReflectRecordEncoding() throws Exception {
CoderProperties.coderDecodeEncodeEqual(coderWithSchema,
AVRO_SPECIFIC_RECORD);
}
+ @Test
+ public void testCoderCached() {
+ Schema schema = AVRO_SPECIFIC_RECORD.getSchema();
+ TypeDescriptor<TestAvro> typeDescriptor =
TypeDescriptor.of(TestAvro.class);
+ Class<TestAvro> clazz = TestAvro.class;
+ boolean useReflectApi = false;
+ AvroDatumFactory<TestAvro> datumFactory = new
AvroDatumFactory.ReflectDatumFactory<>(clazz);
+
+ assertSame(AvroCoder.of(clazz), AvroCoder.of(clazz));
+ assertSame(AvroCoder.of(clazz, useReflectApi), AvroCoder.of(clazz,
useReflectApi));
+ assertSame(AvroCoder.of(typeDescriptor), AvroCoder.of(typeDescriptor));
+ assertSame(
+ AvroCoder.of(typeDescriptor, useReflectApi),
AvroCoder.of(typeDescriptor, useReflectApi));
+ assertSame(AvroCoder.of(clazz, schema), AvroCoder.of(clazz, schema));
+ assertSame(
+ AvroCoder.of(clazz, schema, useReflectApi), AvroCoder.of(clazz,
schema, useReflectApi));
+ assertSame(AvroCoder.of(datumFactory, schema), AvroCoder.of(datumFactory,
schema));
+
+ assertSame(AvroCoder.specific(clazz), AvroCoder.specific(clazz));
Review Comment:
assert that AvroCoder.specific(clazz) is different than
AvroCoder.reflect(clazz)
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroGenericCoder.java:
##########
@@ -17,17 +17,27 @@
*/
package org.apache.beam.sdk.extensions.avro.coders;
+import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
/** AvroCoder specialisation for GenericRecord, needed for cross-language
transforms. */
public class AvroGenericCoder extends AvroCoder<GenericRecord> {
+ private static final Cache<Schema, AvroGenericCoder>
AVRO_GENERIC_CODER_CACHE =
+ CacheBuilder.newBuilder().build();
Review Comment:
use the weakValues option? I'm not sure if there are cases where schemas
are created dynamically and this cache could grow unbounded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]