This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 88d4712 [BEAM-12385] Handle VARCHAR and other SQL specific logical
types in AvroUtils
new 62e8f84 Merge pull request #14858: [BEAM-12385] Handle VARCHAR and
other SQL specific logical types in AvroUtils
88d4712 is described below
commit 88d4712147911744761cb385b8226c81e283d1fe
Author: Anant Damle <[email protected]>
AuthorDate: Fri May 21 23:47:53 2021 +0800
[BEAM-12385] Handle VARCHAR and other SQL specific logical types in
AvroUtils
---
CHANGES.md | 3 +
.../apache/beam/sdk/schemas/utils/AvroUtils.java | 43 ++++
.../beam/sdk/schemas/utils/AvroUtilsTest.java | 244 +++++++++++++++++++++
.../apache/beam/sdk/io/jdbc/SchemaUtilTest.java | 60 +++++
4 files changed, 350 insertions(+)
diff --git a/CHANGES.md b/CHANGES.md
index 7029cd2..62b8e71 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -66,6 +66,9 @@
* `CREATE FUNCTION` DDL statement added to Calcite SQL syntax. `JAR` and
`AGGREGATE` are now reserved keywords.
([BEAM-12339](https://issues.apache.org/jira/browse/BEAM-12339)).
* Flink 1.13 is now supported by the Flink runner
([BEAM-12277](https://issues.apache.org/jira/browse/BEAM-12277)).
* X feature added (Java/Python)
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Add support to convert Beam Schema to Avro Schema for JDBC LogicalTypes:
+ `VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME`
+ (Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)).
## Breaking Changes
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 77b5445..0835f9b 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -906,6 +906,26 @@ public class AvroUtils {
.map(x -> getFieldSchema(x.getType(), x.getName(),
namespace))
.collect(Collectors.toList()));
break;
+ case "CHAR":
+ case "NCHAR":
+ baseType =
+ buildHiveLogicalTypeSchema("char", (int)
fieldType.getLogicalType().getArgument());
+ break;
+ case "NVARCHAR":
+ case "VARCHAR":
+ case "LONGNVARCHAR":
+ case "LONGVARCHAR":
+ baseType =
+ buildHiveLogicalTypeSchema(
+ "varchar", (int) fieldType.getLogicalType().getArgument());
+ break;
+ case "DATE":
+ baseType =
LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
+ break;
+ case "TIME":
+ baseType =
+
LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
+ break;
default:
throw new RuntimeException(
"Unhandled logical type " +
fieldType.getLogicalType().getIdentifier());
@@ -1017,6 +1037,15 @@ public class AvroUtils {
typeWithNullability.type.getTypes().get(oneOfValue.getCaseType().getValue()),
oneOfValue.getValue());
}
+ case "NVARCHAR":
+ case "VARCHAR":
+ case "LONGNVARCHAR":
+ case "LONGVARCHAR":
+ return new Utf8((String) value);
+ case "DATE":
+ return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays();
+ case "TIME":
+ return (int) ((Instant) value).getMillis();
default:
throw new RuntimeException(
"Unhandled logical type " +
fieldType.getLogicalType().getIdentifier());
@@ -1277,4 +1306,18 @@ public class AvroUtils {
checkArgument(
got.equals(expected), "Can't convert '%s' to %s, expected: %s", label,
got, expected);
}
+
+ /**
+ * Helper factory to build Avro Logical types schemas for SQL *CHAR types.
This method <a
+ *
href="https://github.com/apache/hive/blob/5d268834a5f5278ea76399f8af0d0ab043ae0b45/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java#L110-L121">represents
+ * the logical as Hive does</a>.
+ */
+ private static org.apache.avro.Schema buildHiveLogicalTypeSchema(
+ String hiveLogicalType, int size) {
+ String schemaJson =
+ String.format(
+ "{\"type\": \"string\", \"logicalType\": \"%s\", \"maxLength\":
%s}",
+ hiveLogicalType, size);
+ return new org.apache.avro.Schema.Parser().parse(schemaJson);
+ }
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
index 627d555..c1096ca 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
@@ -26,6 +26,7 @@ import com.pholser.junit.quickcheck.Property;
import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.sql.JDBCType;
import java.util.List;
import java.util.Map;
import org.apache.avro.Conversions;
@@ -57,8 +58,13 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.Instant;
+import org.joda.time.LocalTime;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -551,6 +557,164 @@ public class AvroUtilsTest {
}
@Test
+ public void testJdbcLogicalVarCharRowDataToAvroSchema() {
+ String expectedAvroSchemaJson =
+ "{ "
+ + " \"name\": \"topLevelRecord\", "
+ + " \"type\": \"record\", "
+ + " \"fields\": [{ "
+ + " \"name\": \"my_varchar_field\", "
+ + " \"type\": {\"type\": \"string\", \"logicalType\":
\"varchar\", \"maxLength\": 10}"
+ + " }, "
+ + " { "
+ + " \"name\": \"my_longvarchar_field\", "
+ + " \"type\": {\"type\": \"string\", \"logicalType\":
\"varchar\", \"maxLength\": 50}"
+ + " }, "
+ + " { "
+ + " \"name\": \"my_nvarchar_field\", "
+ + " \"type\": {\"type\": \"string\", \"logicalType\":
\"varchar\", \"maxLength\": 10}"
+ + " }, "
+ + " { "
+ + " \"name\": \"my_longnvarchar_field\", "
+ + " \"type\": {\"type\": \"string\", \"logicalType\":
\"varchar\", \"maxLength\": 50}"
+ + " }, "
+ + " { "
+ + " \"name\": \"fixed_length_char_field\", "
+ + " \"type\": {\"type\": \"string\", \"logicalType\": \"char\",
\"maxLength\": 25}"
+ + " } "
+ + " ] "
+ + "}";
+
+ Schema beamSchema =
+ Schema.builder()
+ .addField(
+ Field.of(
+ "my_varchar_field",
FieldType.logicalType(JdbcType.StringType.varchar(10))))
+ .addField(
+ Field.of(
+ "my_longvarchar_field",
+
FieldType.logicalType(JdbcType.StringType.longvarchar(50))))
+ .addField(
+ Field.of(
+ "my_nvarchar_field",
FieldType.logicalType(JdbcType.StringType.nvarchar(10))))
+ .addField(
+ Field.of(
+ "my_longnvarchar_field",
+
FieldType.logicalType(JdbcType.StringType.longnvarchar(50))))
+ .addField(
+ Field.of(
+ "fixed_length_char_field",
+
FieldType.logicalType(JdbcType.StringType.fixedLengthChar(25))))
+ .build();
+
+ assertEquals(
+ new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson),
+ AvroUtils.toAvroSchema(beamSchema));
+ }
+
+ @Test
+ public void testJdbcLogicalVarCharRowDataToGenericRecord() {
+ Schema beamSchema =
+ Schema.builder()
+ .addField(
+ Field.of(
+ "my_varchar_field",
FieldType.logicalType(JdbcType.StringType.varchar(10))))
+ .addField(
+ Field.of(
+ "my_longvarchar_field",
+
FieldType.logicalType(JdbcType.StringType.longvarchar(50))))
+ .addField(
+ Field.of(
+ "my_nvarchar_field",
FieldType.logicalType(JdbcType.StringType.nvarchar(10))))
+ .addField(
+ Field.of(
+ "my_longnvarchar_field",
+
FieldType.logicalType(JdbcType.StringType.longnvarchar(50))))
+ .build();
+
+ Row rowData =
+ Row.withSchema(beamSchema)
+ .addValue("varchar_value")
+ .addValue("longvarchar_value")
+ .addValue("nvarchar_value")
+ .addValue("longnvarchar_value")
+ .build();
+
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+ GenericRecord expectedRecord =
+ new GenericRecordBuilder(avroSchema)
+ .set("my_varchar_field", "varchar_value")
+ .set("my_longvarchar_field", "longvarchar_value")
+ .set("my_nvarchar_field", "nvarchar_value")
+ .set("my_longnvarchar_field", "longnvarchar_value")
+ .build();
+
+ assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData,
avroSchema));
+ }
+
+ @Test
+ public void testJdbcLogicalDateAndTimeRowDataToAvroSchema() {
+ String expectedAvroSchemaJson =
+ "{ "
+ + " \"name\": \"topLevelRecord\", "
+ + " \"type\": \"record\", "
+ + " \"fields\": [{ "
+ + " \"name\": \"my_date_field\", "
+ + " \"type\": { \"type\": \"int\", \"logicalType\": \"date\" }"
+ + " }, "
+ + " { "
+ + " \"name\": \"my_time_field\", "
+ + " \"type\": { \"type\": \"int\", \"logicalType\":
\"time-millis\" }"
+ + " }"
+ + " ] "
+ + "}";
+
+ Schema beamSchema =
+ Schema.builder()
+ .addField(Field.of("my_date_field",
FieldType.logicalType(JdbcType.DATE)))
+ .addField(Field.of("my_time_field",
FieldType.logicalType(JdbcType.TIME)))
+ .build();
+
+ assertEquals(
+ new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson),
+ AvroUtils.toAvroSchema(beamSchema));
+ }
+
+ @Test
+ public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() {
+ // Test Fixed clock at
+ DateTime testDateTime = DateTime.parse("2021-05-29T11:15:16.234Z");
+
+ Schema beamSchema =
+ Schema.builder()
+ .addField(Field.of("my_date_field",
FieldType.logicalType(JdbcType.DATE)))
+ .addField(Field.of("my_time_field",
FieldType.logicalType(JdbcType.TIME)))
+ .build();
+
+ Row rowData =
+ Row.withSchema(beamSchema)
+
.addValue(testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant())
+
.addValue(Instant.ofEpochMilli(testDateTime.toLocalTime().millisOfDay().get()))
+ .build();
+
+ int daysFromEpoch =
+ Days.daysBetween(
+ Instant.EPOCH,
+
testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant())
+ .getDays();
+ int timeSinceMidNight = testDateTime.toLocalTime().getMillisOfDay();
+
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+ GenericRecord expectedRecord =
+ new GenericRecordBuilder(avroSchema)
+ .set("my_date_field", daysFromEpoch)
+ .set("my_time_field", timeSinceMidNight)
+ .build();
+
+ assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData,
avroSchema));
+ }
+
+ @Test
public void testBeamRowToGenericRecord() {
GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(),
null);
assertEquals(getAvroSchema(), genericRecord.getSchema());
@@ -640,4 +804,84 @@ public class AvroUtilsTest {
AvroUtils.getFromRowFunction(GenericRecord.class),
AvroUtils.getFromRowFunction(GenericRecord.class));
}
+
+ /** Helper class that simulate JDBC Logical types. */
+ private static class JdbcType<T> implements Schema.LogicalType<T, T> {
+
+ private static final JdbcType<Instant> DATE =
+ new JdbcType<>(JDBCType.DATE, FieldType.STRING, FieldType.DATETIME,
"");
+ private static final JdbcType<Instant> TIME =
+ new JdbcType<>(JDBCType.TIME, FieldType.STRING, FieldType.DATETIME,
"");
+
+ private final String identifier;
+ private final FieldType argumentType;
+ private final FieldType baseType;
+ private final Object argument;
+
+ private static class StringType extends JdbcType<String> {
+
+ private static StringType fixedLengthChar(int size) {
+ return new StringType(JDBCType.CHAR, size);
+ }
+
+ private static StringType varchar(int size) {
+ return new StringType(JDBCType.VARCHAR, size);
+ }
+
+ private static StringType longvarchar(int size) {
+ return new StringType(JDBCType.LONGVARCHAR, size);
+ }
+
+ private static StringType nvarchar(int size) {
+ return new StringType(JDBCType.NVARCHAR, size);
+ }
+
+ private static StringType longnvarchar(int size) {
+ return new StringType(JDBCType.LONGNVARCHAR, size);
+ }
+
+ private StringType(JDBCType type, int size) {
+ super(type, FieldType.INT32, FieldType.STRING, size);
+ }
+ }
+
+ private JdbcType(
+ JDBCType jdbcType, FieldType argumentType, FieldType baseType, Object
argument) {
+ this.identifier = jdbcType.getName();
+ this.argumentType = argumentType;
+ this.baseType = baseType;
+ this.argument = argument;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public @Nullable FieldType getArgumentType() {
+ return argumentType;
+ }
+
+ @Override
+ public FieldType getBaseType() {
+ return baseType;
+ }
+
+ @Override
+ @SuppressWarnings("TypeParameterUnusedInFormals")
+ public <T1> @Nullable T1 getArgument() {
+ return (T1) argument;
+ }
+
+ @Override
+ public @NonNull T toBaseType(@NonNull T input) {
+ return input;
+ }
+
+ @Override
+ public @NonNull T toInputType(@NonNull T base) {
+ return base;
+ }
+ }
}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java
index 7ec9e7b..18acf04 100644
---
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java
@@ -37,6 +37,7 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
@@ -224,6 +225,65 @@ public class SchemaUtilTest {
}
@Test
+ public void testJdbcLogicalTypesMapValidAvroSchemaIT() {
+ String expectedAvroSchema =
+ "{"
+ + " \"type\": \"record\","
+ + " \"name\": \"topLevelRecord\","
+ + " \"fields\": [{"
+ + " \"name\": \"longvarchar_col\","
+ + " \"type\": {"
+ + " \"type\": \"string\","
+ + " \"logicalType\": \"varchar\","
+ + " \"maxLength\": 50"
+ + " }"
+ + " }, {"
+ + " \"name\": \"varchar_col\","
+ + " \"type\": {"
+ + " \"type\": \"string\","
+ + " \"logicalType\": \"varchar\","
+ + " \"maxLength\": 15"
+ + " }"
+ + " }, {"
+ + " \"name\": \"fixedlength_char_col\","
+ + " \"type\": {"
+ + " \"type\": \"string\","
+ + " \"logicalType\": \"char\","
+ + " \"maxLength\": 25"
+ + " }"
+ + " }, {"
+ + " \"name\": \"date_col\","
+ + " \"type\": {"
+ + " \"type\": \"int\","
+ + " \"logicalType\": \"date\""
+ + " }"
+ + " }, {"
+ + " \"name\": \"time_col\","
+ + " \"type\": {"
+ + " \"type\": \"int\","
+ + " \"logicalType\": \"time-millis\""
+ + " }"
+ + " }]"
+ + "}";
+
+ Schema jdbcRowSchema =
+ Schema.builder()
+ .addField(
+ "longvarchar_col",
LogicalTypes.variableLengthString(JDBCType.LONGVARCHAR, 50))
+ .addField("varchar_col",
LogicalTypes.variableLengthString(JDBCType.VARCHAR, 15))
+ .addField("fixedlength_char_col",
LogicalTypes.fixedLengthString(JDBCType.CHAR, 25))
+ .addField("date_col", LogicalTypes.JDBC_DATE_TYPE)
+ .addField("time_col", LogicalTypes.JDBC_TIME_TYPE)
+ .build();
+
+ System.out.println(AvroUtils.toAvroSchema(jdbcRowSchema));
+
+ assertEquals(
+ new org.apache.avro.Schema.Parser().parse(expectedAvroSchema),
+ AvroUtils.toAvroSchema(jdbcRowSchema));
+ }
+
+ @Test
public void testBeamRowMapperDateTime() throws Exception {
long epochMilli = 1558719710000L;