This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2c58dca500d [FLINK-25962][avro] Use namespaces for generated records
2c58dca500d is described below
commit 2c58dca500d0ec4f5d80852aa96ddb9c06ae4d61
Author: Ryan Skraba <[email protected]>
AuthorDate: Tue Aug 2 14:53:31 2022 +0200
[FLINK-25962][avro] Use namespaces for generated records
---
.../util/kafka/SQLClientSchemaRegistryITCase.java | 2 +-
.../avro/typeutils/AvroSchemaConverter.java | 4 +--
.../flink/formats/avro/AvroBulkFormatTest.java | 30 ++++++++++++++--------
.../avro/typeutils/AvroSchemaConverterTest.java | 3 +++
4 files changed, 26 insertions(+), 13 deletions(-)
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
index cb4285bf961..77960650add 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
@@ -116,7 +116,7 @@ public class SQLClientSchemaRegistryITCase {
String testResultsTopic = "test-results-" +
UUID.randomUUID().toString();
kafkaClient.createTopic(1, 1, testCategoryTopic);
Schema categoryRecord =
- SchemaBuilder.record("record")
+ SchemaBuilder.record("org.apache.flink.avro.generated.record")
.fields()
.requiredLong("category_id")
.optionalString("name")
diff --git
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index d84ed069b9b..042a343af12 100644
---
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -296,14 +296,14 @@ public class AvroSchemaConverter {
/**
* Converts Flink SQL {@link LogicalType} (can be nested) into an Avro
schema.
*
- * <p>Use "record" as the type name.
+ * <p>Use "org.apache.flink.avro.generated.record" as the type name.
*
* @param schema the schema type, usually it should be the top level
record type, e.g. not a
* nested type
* @return Avro's {@link Schema} matching this logical type.
*/
public static Schema convertToSchema(LogicalType schema) {
- return convertToSchema(schema, "record");
+ return convertToSchema(schema,
"org.apache.flink.avro.generated.record");
}
/**
diff --git
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
index 28798bb07a7..28361ce2426 100644
---
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
+++
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
@@ -59,7 +59,7 @@ class AvroBulkFormatTest {
private static final List<RowData> TEST_DATA =
Arrays.asList(
- // -------- batch 0, block start 186 --------
+ // -------- batch 0, block start 232 --------
GenericRowData.of(
StringData.fromString("AvroBulk"),
StringData.fromString("FormatTest")),
GenericRowData.of(
@@ -71,20 +71,20 @@ class AvroBulkFormatTest {
+ "以为流觞曲水,列坐其次。虽无丝竹管弦之盛,一觞一咏,亦足以畅"
+ "叙幽情。"),
StringData.fromString("")),
- // -------- batch 1, block start 547 --------
+ // -------- batch 1, block start 593 --------
GenericRowData.of(
StringData.fromString("File"),
StringData.fromString("Format")),
GenericRowData.of(
null,
StringData.fromString(
"This is a string with English, 中文 and
even 🍎🍌🍑🥝🍍🥭🍐")),
- // -------- batch 2, block start 659 --------
+ // -------- batch 2, block start 705 --------
GenericRowData.of(
StringData.fromString("block with"),
StringData.fromString("only one record"))
- // -------- file length 706 --------
+ // -------- file length 752 --------
);
- private static final List<Integer> BLOCK_STARTS = Arrays.asList(186, 547,
659);
+ private static final List<Long> BLOCK_STARTS = Arrays.asList(232L, 593L,
705L);
private File tmpFile;
@@ -101,13 +101,23 @@ class AvroBulkFormatTest {
DatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new
DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, out);
- dataFileWriter.setSyncInterval(64);
-
- for (RowData rowData : TEST_DATA) {
- dataFileWriter.append((GenericRecord) converter.convert(schema,
rowData));
- }
+ // Generate the sync points manually in order to test blocks.
+ long syncBlock1 = dataFileWriter.sync();
+ dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(0)));
+ dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(1)));
+ dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(2)));
+ long syncBlock2 = dataFileWriter.sync();
+ dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(3)));
+ dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(4)));
+ long syncBlock3 = dataFileWriter.sync();
+ dataFileWriter.append((GenericRecord) converter.convert(schema,
TEST_DATA.get(5)));
+ long syncEnd = dataFileWriter.sync();
dataFileWriter.close();
+
+ // These values should be constant if nothing else changes with the
file.
+ assertThat(BLOCK_STARTS).isEqualTo(Arrays.asList(syncBlock1,
syncBlock2, syncBlock3));
+ assertThat(tmpFile).hasSize(syncEnd);
}
@AfterEach
diff --git
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index 7556d60a63c..df3731401da 100644
---
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -190,6 +190,7 @@ class AvroSchemaConverterTest {
"{\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record\",\n"
+ + " \"namespace\" :
\"org.apache.flink.avro.generated\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"row1\",\n"
+ " \"type\" : [ \"null\", {\n"
@@ -326,6 +327,7 @@ class AvroSchemaConverterTest {
"{\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record\",\n"
+ + " \"namespace\" :
\"org.apache.flink.avro.generated\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"f_null\",\n"
+ " \"type\" : \"null\",\n"
@@ -435,6 +437,7 @@ class AvroSchemaConverterTest {
"{\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record\",\n"
+ + " \"namespace\" :
\"org.apache.flink.avro.generated\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"f_boolean\",\n"
+ " \"type\" : \"boolean\"\n"