mosche commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1211629393
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java:
##########
@@ -20,38 +20,41 @@
import org.apache.avro.Schema;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.StringUtils;
/** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
@SuppressWarnings({
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
})
class AvroCoderCloudObjectTranslator implements
CloudObjectTranslator<AvroCoder> {
- private static final String TYPE_FIELD = "type";
+ private static final String DATUM_FACTORY_FIELD = "datum_factory";
private static final String SCHEMA_FIELD = "schema";
- private static final String REFLECT_API_FIELD = "reflect_api";
@Override
public CloudObject toCloudObject(AvroCoder target, SdkComponents
sdkComponents) {
CloudObject base = CloudObject.forClass(AvroCoder.class);
+ byte[] serializedDatumFactory =
+ SerializableUtils.serializeToByteArray(target.getDatumFactory());
+ Structs.addString(
+ base, DATUM_FACTORY_FIELD,
StringUtils.byteArrayToJsonString(serializedDatumFactory));
Structs.addString(base, SCHEMA_FIELD, target.getSchema().toString());
- Structs.addString(base, TYPE_FIELD, target.getType().getName());
- Structs.addBoolean(base, REFLECT_API_FIELD, target.useReflectApi());
return base;
}
@Override
public AvroCoder<?> fromCloudObject(CloudObject cloudObject) {
Review Comment:
Not sure how much of an issue this really is, I'm not too familiar with
Dataflow. I suspect changing the CloudObject payload will be problematic with
Dataflow streaming pipelines if upgrading Beam and this is persisted somewhere.
It might be necessary to support constructing the AvroCoder from a
CloudObject created with a previous version of Beam.
@kennknowles could you answer this or ping someone who's in the position to
answer?
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -702,6 +739,16 @@ public Read<T> withBeamSchemas(boolean withBeamSchemas) {
return toBuilder().setInferBeamSchema(withBeamSchemas).build();
}
+ /** Sets a coder for the result of the read function. */
+ public Read<T> withCoder(Coder<T> coder) {
Review Comment:
Just fyi, usage of `withCoder` is [discouraged
](https://beam.apache.org/documentation/io/io-standards/#classes--methods--properties)
based on some fairly recent discussions on IO standards.
> Strongly Discouraged
>
> Sets the coder to use to encode/decode the element type of the output /
input PCollection of this connector. In general, it is recommended that sources
will:
>
> 1. Return Row objects with a schema that is automatically inferred.
> 2. Automatically set the necessary coder by having fixed output/input
types, or inferring their output/input types.
>
> If neither 1 and 2 are possible, then a withCoder(...) method can be added.
I suppose you'd like to have a way of creating a custom coder from your own
AvroDatumFactory? What if Read could be configured with an AvroDatumFactory
instead of the DatumReaderFactory?
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -768,53 +695,22 @@ private static Field getField(Class<?> originalClazz,
String name) {
}
}
+ /**
+ * @return {@code true} if the two {@link AvroCoder} instances have the same
class, type and
+ * schema.
+ */
@Override
public boolean equals(@Nullable Object other) {
- if (other == this) {
- return true;
- }
- if (!(other instanceof AvroCoder)) {
+ if (other == null || this.getClass() != other.getClass()) {
return false;
}
AvroCoder<?> that = (AvroCoder<?>) other;
return Objects.equals(this.schemaSupplier.get(), that.schemaSupplier.get())
- && Objects.equals(this.typeDescriptor, that.typeDescriptor)
- && this.useReflectApi == that.useReflectApi;
+ && Objects.equals(this.typeDescriptor, that.typeDescriptor);
}
@Override
public int hashCode() {
- return Objects.hash(schemaSupplier.get(), typeDescriptor, useReflectApi);
- }
-
- /**
- * Conversion for DateTime.
- *
- * <p>This is a copy from Avro 1.8's TimestampConversion, which is renamed
in Avro 1.9. Defining
- * own copy gives flexibility for Beam Java SDK to work with Avro 1.8 and
1.9 at runtime.
- *
- * @see <a href="https://issues.apache.org/jira/browse/BEAM-9144">BEAM-9144:
Beam's own Avro
- * TimeConversion class in beam-sdk-java-core</a>
- */
- public static class JodaTimestampConversion extends Conversion<DateTime> {
Review Comment:
Thanks, this looks great!
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -306,41 +357,34 @@ public ReflectData get() {
private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer;
private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader;
- // Lazily re-instantiated after deserialization
- private final Supplier<ReflectData> reflectData;
-
protected AvroCoder(Class<T> type, Schema schema) {
- this(type, schema, false);
+ this(type, schema, true);
Review Comment:
Is this swapped intentionally? This could silently change the datum factory
for SpecificRecord, right?
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java:
##########
@@ -161,6 +152,25 @@ public class AvroUtils {
new ForLoadedType(ReadableInstant.class);
private static final ForLoadedType JODA_INSTANT = new
ForLoadedType(Instant.class);
+ public static void addLogicalTypeConversions(final GenericData data) {
Review Comment:
Wondering, should these conversions all be singletons?
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroDatumFactory.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import java.util.Objects;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Create {@link DatumReader} and {@link DatumWriter} for given schemas. */
+public abstract class AvroDatumFactory<T>
+ implements AvroSource.DatumReaderFactory<T>,
AvroSink.DatumWriterFactory<T> {
+
+ protected final Class<T> type;
+
+ public AvroDatumFactory(Class<T> type) {
+ this.type = type;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ AvroDatumFactory<?> that = (AvroDatumFactory<?>) other;
+ return Objects.equals(type, that.type);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), type);
+ }
+
+ /** Specialized {@link AvroDatumFactory} for {@link GenericRecord}. */
+ public static class GenericDatumFactory extends
AvroDatumFactory<GenericRecord> {
+
+ public static final GenericDatumFactory INSTANCE = new
GenericDatumFactory();
+
+ public GenericDatumFactory() {
+ super(GenericRecord.class);
+ }
+
+ @Override
+ public DatumReader<GenericRecord> apply(Schema writer, Schema reader) {
+ return new GenericDatumReader<>(writer, reader);
+ }
+
+ @Override
+ public DatumWriter<GenericRecord> apply(Schema writer) {
+ return new GenericDatumWriter<>(writer);
+ }
+ }
+
+ /** Specialized {@link AvroDatumFactory} for {@link
org.apache.avro.specific.SpecificRecord}. */
+ public static class SpecificDatumFactory<T> extends AvroDatumFactory<T> {
+ SpecificDatumFactory(Class<T> type) {
+ super(type);
+ }
+
+ @Override
+ public DatumReader<T> apply(Schema writer, Schema reader) {
+ // create the datum writer using the Class<T> api.
+ // avro will load the proper class loader and when using avro 1.9
+ // the proper data with conversions (SpecificData.getForClass)
+ SpecificDatumReader<T> datumReader = new
SpecificDatumReader<>(this.type);
+ datumReader.setExpected(reader);
+ datumReader.setSchema(writer);
Review Comment:
Thanks for the clarification @RustedBones 👍 Would it make sense to add a
test case to verify the behavior?
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -839,21 +894,29 @@ public ReadFiles<T> withBeamSchemas(boolean
withBeamSchemas) {
return toBuilder().setInferBeamSchema(withBeamSchemas).build();
}
+ /** Sets a coder for the result of the read function. */
+ public ReadFiles<T> withCoder(Coder<T> coder) {
Review Comment:
Same as above, wondering if there's alternatives
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroJodaTimeConversions.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.schemas.utils;
+
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * Avro 1.8 & 1.9 ship joda time conversions.
+ *
+ * <p>Since avro 1.10, only java time conversions are included. As beam is
still joda time based,
+ * and user may work with avro joda time generated classes, Provide joda time
logical conversions.
+ *
+ * <p>This code is copied from avro 1.8.2 TimeConversions.
+ */
+public class AvroJodaTimeConversions {
+
+ public static class DateConversion extends Conversion<LocalDate> {
+ private static final LocalDate EPOCH_DATE = new LocalDate(1970, 1, 1);
+
+ @Override
+ public Class<LocalDate> getConvertedType() {
+ return LocalDate.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return "date";
+ }
+
+ @Override
+ public LocalDate fromInt(Integer daysFromEpoch, Schema schema, LogicalType
type) {
+ return EPOCH_DATE.plusDays(daysFromEpoch);
+ }
+
+ @Override
+ public Integer toInt(LocalDate date, Schema schema, LogicalType type) {
+ return Days.daysBetween(EPOCH_DATE, date).getDays();
+ }
+ }
+
+ public static class TimeConversion extends Conversion<LocalTime> {
+ @Override
+ public Class<LocalTime> getConvertedType() {
+ return LocalTime.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return "time-millis";
+ }
+
+ @Override
+ public LocalTime fromInt(Integer millisFromMidnight, Schema schema,
LogicalType type) {
+ return LocalTime.fromMillisOfDay(millisFromMidnight);
+ }
+
+ @Override
+ public Integer toInt(LocalTime time, Schema schema, LogicalType type) {
+ return time.millisOfDay().get();
+ }
+ }
+
+ public static class TimeMicrosConversion extends Conversion<LocalTime> {
+ @Override
+ public Class<LocalTime> getConvertedType() {
+ return LocalTime.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return "time-micros";
+ }
+
+ @Override
+ public LocalTime fromLong(Long microsFromMidnight, Schema schema,
LogicalType type) {
+ return LocalTime.fromMillisOfDay(microsFromMidnight / 1000);
+ }
+ }
+
+ public static class LossyTimeMicrosConversion extends TimeMicrosConversion {
+ @Override
+ public Long toLong(LocalTime time, Schema schema, LogicalType type) {
+ return 1000 * (long) time.millisOfDay().get();
+ }
+ }
+
+ public static class TimestampConversion extends Conversion<DateTime> {
+ @Override
+ public Class<DateTime> getConvertedType() {
+ return DateTime.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return "timestamp-millis";
+ }
+
+ @Override
+ public DateTime fromLong(Long millisFromEpoch, Schema schema, LogicalType
type) {
+ return new DateTime(millisFromEpoch, DateTimeZone.UTC);
+ }
+
+ @Override
+ public Long toLong(DateTime timestamp, Schema schema, LogicalType type) {
+ return timestamp.getMillis();
+ }
+ }
+
+ public static class TimestampMicrosConversion extends Conversion<DateTime> {
Review Comment:
I know this matches the conversions in Avro 1.8.2., but it seems strange
that fromLong takes micros and toLong produces millis ... Are you aware of any
particular reason for this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]