RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1214111539
##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -63,67 +74,91 @@ dependencies {
testImplementation library.java.junit
testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
testRuntimeOnly library.java.slf4j_jdk14
- avroVersions.each {
- "avroVersion$it.key" "org.apache.avro:avro:$it.value"
- "avroVersion$it.key" "org.apache.avro:avro-tools:$it.value"
+ avroVersions.each { k,v ->
+ "avroVersion$k"(project(path: ":sdks:java:core", configuration:
"shadowTest")) {
+ // Exclude Avro dependencies from "core" since Avro support moved to
this extension
+ exclude group: "org.apache.avro", module: "avro"
+ }
+ "avroVersion$k" library.java.junit
+ "avroVersion$k" project(path: ":runners:direct-java", configuration:
"shadow")
+ "avroVersion$k" library.java.slf4j_jdk14
+ "avroVersion$k" "org.tukaani:xz:1.9" // makes as provided since 1.9
+ "avroVersion$k" library.java.zstd_jni // makes as provided since 1.9
+ "avroVersion${k}Generate" "org.apache.avro:avro-tools:$v"
}
}
-avroVersions.each { kv ->
- configurations."avroVersion$kv.key" {
- resolutionStrategy {
- force "org.apache.avro:avro:$kv.value"
+task printSourceSetInformation(){
+
+ doLast{
+ sourceSets.each { srcSet ->
+ println "["+srcSet.name+"]"
+ print "-->Source directories: "+srcSet.allJava.srcDirs+"\n"
+ print "-->Output directories: "+srcSet.output.classesDirs.files+"\n"
+ print "-->Compile classpath:\n"
+ srcSet.runtimeClasspath.files.each {
+ print " "+it.path+"\n"
+ }
+ println ""
}
}
+}
+
+avroVersions.each { k, v ->
+ configurations."avroVersion$k" {
+ resolutionStrategy.force "org.apache.avro:avro:$v",
"org.apache.avro:avro:$v:tests", library.java.joda_time
+ }
sourceSets {
- "avro${kv.key}" {
+ "avro$k" {
java {
- srcDirs "build/generated/sources/avro${kv.key}/test/java"
+ srcDirs "src/test/java", "build/generated/sources/avro$k/test/java"
}
- compileClasspath = configurations."avroVersion$kv.key" +
sourceSets.test.output + sourceSets.test.compileClasspath
- runtimeClasspath += compileClasspath + sourceSets.test.runtimeClasspath
Review Comment:
I wasn't confident with this classpath. All avro test versions would contain
the `1.8.2` generated model. When testing with the `Class<T>` API, we have no
guarantee to load the proper class.
Here I changed the settings so all tests are recompiled with the avro
version, and their generated class only.
##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java:
##########
@@ -323,35 +314,71 @@ public void testKryoSerialization() throws Exception {
@Test
public void testPojoEncoding() throws Exception {
Pojo value = new Pojo("Hello", 42, DATETIME_A);
- AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
+ AvroCoder<Pojo> coder = AvroCoder.reflect(Pojo.class);
CoderProperties.coderDecodeEncodeEqual(coder, value);
}
@Test
public void testSpecificRecordEncoding() throws Exception {
- if (isBrokenMapComparison()) {
- // Don't compare the map values because of AVRO-2943
- AVRO_SPECIFIC_RECORD.setMap(ImmutableMap.of());
- }
- AvroCoder<TestAvro> coder =
- AvroCoder.of(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema(), false);
+ // Don't compare the map values because of AVRO-2943
+ AVRO_SPECIFIC_RECORD.setMap(ImmutableMap.of());
+
+ AvroCoder<TestAvro> coder = AvroCoder.specific(TestAvro.class);
+ AvroCoder<TestAvro> coderWithSchema =
+ AvroCoder.specific(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema());
assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType()));
+
assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType()));
+
CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD);
+ CoderProperties.coderDecodeEncodeEqual(coderWithSchema,
AVRO_SPECIFIC_RECORD);
}
- private boolean isBrokenMapComparison() {
- return VERSION_AVRO.equals("1.9.2")
- || VERSION_AVRO.equals("1.10.2")
- || VERSION_AVRO.equals("1.11.1");
Review Comment:
All avro generated class uses `CharSequence` now.
##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java:
##########
@@ -78,13 +77,37 @@ public class AvroUtilsTest {
private static final org.apache.avro.Schema NULL_SCHEMA =
org.apache.avro.Schema.create(Type.NULL);
+ private static final String VERSION_AVRO =
+ org.apache.avro.Schema.class.getPackage().getImplementationVersion();
+
+ private Iterable<?> randomData(org.apache.avro.Schema schema, int maxLength)
throws Exception {
+ Iterable<?> data;
+ if (VERSION_AVRO.equals("1.8.2")) {
+ data =
+ (Iterable<?>)
+ Class.forName("org.apache.avro.RandomData")
+ .getDeclaredConstructor(org.apache.avro.Schema.class,
Integer.TYPE)
+ .newInstance(schema, maxLength);
+ } else {
+ data =
+ (Iterable<?>)
+ Class.forName("org.apache.avro.util.RandomData")
+ .getDeclaredConstructor(org.apache.avro.Schema.class,
Integer.TYPE, Boolean.TYPE)
+ // force Utf8 in random data to match with String type used
in AvroUtils
+ .newInstance(schema, maxLength, true);
Review Comment:
In avro 1.9+, `RandomData` has moved from the `tests` classifier lib into
the `util` package of the main library and default behavior, has moved to
`String` instead of `Utf8`.
##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/AvroSourceTest.java:
##########
@@ -86,6 +89,17 @@ private enum SyncBehavior {
private static final int DEFAULT_RECORD_COUNT = 1000;
+ private Iterable<String> avroSupportedCodec() {
+ List<String> codecs = new ArrayList<>();
+ codecs.add(DataFileConstants.NULL_CODEC);
+ codecs.add(DataFileConstants.BZIP2_CODEC);
+ codecs.add(DataFileConstants.DEFLATE_CODEC);
+ codecs.add(DataFileConstants.SNAPPY_CODEC);
+ codecs.add(DataFileConstants.XZ_CODEC);
+ if (!VERSION_AVRO.equals("1.8.2")) codecs.add("zstandard");
Review Comment:
Added `zstandard` in tests for avro 1.9+
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroSource.java:
##########
@@ -270,11 +265,19 @@ public AvroSource<T>
withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreat
/** Reads files containing records that conform to the given schema. */
public AvroSource<GenericRecord> withSchema(String schema) {
checkArgument(schema != null, "schema can not be null");
+ if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
+ return new AvroSource<>(
+ getSingleFileMetadata(),
+ getMinBundleSize(),
+ getStartOffset(),
+ getEndOffset(),
+ mode.withSchema(schema));
+ }
Review Comment:
I think this case was missed. It was possible to have
`SINGLE_FILE_OR_SUBRANGE` for typed avro but not for generic.
##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java:
##########
@@ -323,35 +314,71 @@ public void testKryoSerialization() throws Exception {
@Test
public void testPojoEncoding() throws Exception {
Pojo value = new Pojo("Hello", 42, DATETIME_A);
- AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
+ AvroCoder<Pojo> coder = AvroCoder.reflect(Pojo.class);
CoderProperties.coderDecodeEncodeEqual(coder, value);
}
@Test
public void testSpecificRecordEncoding() throws Exception {
- if (isBrokenMapComparison()) {
- // Don't compare the map values because of AVRO-2943
- AVRO_SPECIFIC_RECORD.setMap(ImmutableMap.of());
- }
- AvroCoder<TestAvro> coder =
- AvroCoder.of(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema(), false);
+ // Don't compare the map values because of AVRO-2943
+ AVRO_SPECIFIC_RECORD.setMap(ImmutableMap.of());
+
+ AvroCoder<TestAvro> coder = AvroCoder.specific(TestAvro.class);
+ AvroCoder<TestAvro> coderWithSchema =
+ AvroCoder.specific(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema());
assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType()));
+
assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType()));
+
CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD);
+ CoderProperties.coderDecodeEncodeEqual(coderWithSchema,
AVRO_SPECIFIC_RECORD);
}
- private boolean isBrokenMapComparison() {
- return VERSION_AVRO.equals("1.9.2")
- || VERSION_AVRO.equals("1.10.2")
- || VERSION_AVRO.equals("1.11.1");
+ // example to overcome AVRO-2943 limitation with custom datum factory
+ // force usage of String instead of Utf8 when avro type is set to
CharSequence
+ static class CustomSpecificDatumFactory<T> extends
AvroDatumFactory.SpecificDatumFactory<T> {
Review Comment:
@mosche here is an example of a custom `AvroDatumFactory`.
In this case, we change the default model creation by using `String` as
underlying `CharSequence` implementation instead of `Utf8`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]