[
https://issues.apache.org/jira/browse/BEAM-4076?focusedWorklogId=107512&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107512
]
ASF GitHub Bot logged work on BEAM-4076:
----------------------------------------
Author: ASF GitHub Bot
Created on: 31/May/18 02:13
Start Date: 31/May/18 02:13
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5498: [BEAM-4076]
Remove unsafe methods from Schema.TypeName and Schema.FieldType
URL: https://github.com/apache/beam/pull/5498
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 69da5645e11..817e0248ac1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.schemas;
-import static com.google.common.base.Preconditions.checkArgument;
-
import com.google.auto.value.AutoValue;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
@@ -84,69 +82,69 @@ public Builder addNullableField(String name, FieldType
type) {
}
public Builder addByteField(String name) {
- fields.add(Field.of(name, TypeName.BYTE.type()));
+ fields.add(Field.of(name, FieldType.BYTE));
return this;
}
public Builder addInt16Field(String name) {
- fields.add(Field.of(name, TypeName.INT16.type()));
+ fields.add(Field.of(name, FieldType.INT16));
return this;
}
public Builder addInt32Field(String name) {
- fields.add(Field.of(name, TypeName.INT32.type()));
+ fields.add(Field.of(name, FieldType.INT32));
return this;
}
public Builder addInt64Field(String name) {
- fields.add(Field.of(name, TypeName.INT64.type()));
+ fields.add(Field.of(name, FieldType.INT64));
return this;
}
public Builder addDecimalField(String name) {
- fields.add(Field.of(name, TypeName.DECIMAL.type()));
+ fields.add(Field.of(name, FieldType.DECIMAL));
return this;
}
public Builder addFloatField(String name) {
- fields.add(Field.of(name, TypeName.FLOAT.type()));
+ fields.add(Field.of(name, FieldType.FLOAT));
return this;
}
public Builder addDoubleField(String name) {
- fields.add(Field.of(name, TypeName.DOUBLE.type()));
+ fields.add(Field.of(name, FieldType.DOUBLE));
return this;
}
public Builder addStringField(String name) {
- fields.add(Field.of(name, TypeName.STRING.type()));
+ fields.add(Field.of(name, FieldType.STRING));
return this;
}
public Builder addDateTimeField(String name) {
- fields.add(Field.of(name, TypeName.DATETIME.type()));
+ fields.add(Field.of(name, FieldType.DATETIME));
return this;
}
public Builder addBooleanField(String name) {
- fields.add(Field.of(name, TypeName.BOOLEAN.type()));
+ fields.add(Field.of(name, FieldType.BOOLEAN));
return this;
}
public Builder addArrayField(String name, FieldType collectionElementType)
{
fields.add(
- Field.of(name,
TypeName.ARRAY.type().withCollectionElementType(collectionElementType)));
+ Field.of(name, FieldType.array(collectionElementType)));
return this;
}
public Builder addRowField(String name, Schema fieldSchema) {
- fields.add(Field.of(name,
TypeName.ROW.type().withRowSchema(fieldSchema)));
+ fields.add(Field.of(name, FieldType.row(fieldSchema)));
return this;
}
public Builder addMapField(
String name, FieldType keyType, FieldType valueType) {
- fields.add(Field.of(name, TypeName.MAP.type().withMapType(keyType,
valueType)));
+ fields.add(Field.of(name, FieldType.map(keyType, valueType)));
return this;
}
@@ -201,8 +199,13 @@ public int hashCode() {
return fields;
}
- /**
- * An enumerated list of supported types.
+ /** An enumerated list of type constructors.
+ *
+ * <ul>
+ * <li>Atomic types are built from type constructors that take no arguments
+ * <li>Arrays, rows, and maps are type constructors that take additional
+ * arguments to form a valid {@link FieldType}.
+ * </ul>
*/
public enum TypeName {
BYTE, // One-byte signed integer.
@@ -248,16 +251,6 @@ public boolean isMapType() {
public boolean isCompositeType() {
return COMPOSITE_TYPES.contains(this);
}
-
- /**
- * Returns a {@link FieldType} representing this primitive type.
- *
- * @deprecated a {@link TypeName} is not a type, so this conversion is not
sound.
- */
- @Deprecated
- public FieldType type() {
- return FieldType.of(this);
- }
}
/**
@@ -269,22 +262,28 @@ public FieldType type() {
public abstract static class FieldType implements Serializable {
// Returns the type of this field.
public abstract TypeName getTypeName();
+
// For container types (e.g. ARRAY), returns the type of the contained
element.
@Nullable public abstract FieldType getCollectionElementType();
+
// For MAP type, returns the type of the key element, it must be a
primitive type;
@Nullable public abstract FieldType getMapKeyType();
+
// For MAP type, returns the type of the value element, it can be a nested
type;
@Nullable public abstract FieldType getMapValueType();
+
// For ROW types, returns the schema for the row.
@Nullable public abstract Schema getRowSchema();
+
/**
* Returns optional extra metadata.
*/
@SuppressWarnings("mutable")
@Nullable public abstract byte[] getMetadata();
+
abstract FieldType.Builder toBuilder();
- public static Builder forTypeName(TypeName typeName) {
+ public static FieldType.Builder forTypeName(TypeName typeName) {
return new AutoValue_Schema_FieldType.Builder().setTypeName(typeName);
}
@@ -354,41 +353,6 @@ public static final FieldType row(Schema schema) {
return FieldType.forTypeName(TypeName.ROW).setRowSchema(schema).build();
}
- /**
- * For container types, adds the type of the component element.
- */
- public FieldType withCollectionElementType(@Nullable FieldType
collectionElementType) {
- if (collectionElementType != null) {
- checkArgument(getTypeName().isCollectionType());
- }
- return
toBuilder().setCollectionElementType(collectionElementType).build();
- }
-
- /**
- * For MAP type, adds the type of the component key/value element.
- */
- public FieldType withMapType(
- @Nullable FieldType mapKeyType,
- @Nullable FieldType mapValueType) {
- if (mapKeyType != null && mapValueType != null) {
- checkArgument(getTypeName().isMapType());
- checkArgument(mapKeyType.getTypeName().isPrimitiveType());
- }
- return toBuilder()
- .setMapKeyType(mapKeyType)
- .setMapValueType(mapValueType).build();
- }
-
- /**
- * For ROW types, sets the schema of the row.
- */
- public FieldType withRowSchema(@Nullable Schema rowSchema) {
- if (rowSchema != null) {
- checkArgument(getTypeName().isCompositeType());
- }
- return toBuilder().setRowSchema(rowSchema).build();
- }
-
/**
* Returns a copy of the descriptor with metadata set.
*/
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
index b17f435bda4..4c67f4311c9 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
@@ -108,7 +108,7 @@ public Row deserialize(JsonParser jsonParser,
DeserializationContext deserializa
(Row) extractJsonNodeValue(
FieldValue.of(
"root",
- TypeName.ROW.type().withRowSchema(schema),
+ FieldType.row(schema),
jsonParser
.readValueAsTree()));
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
index 675b9422eda..3ee9d17a249 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
@@ -28,7 +28,6 @@
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -87,7 +86,7 @@ public void testNestedTypes() throws Exception {
@Test
public void testArrays() throws Exception {
Schema schema = Schema.builder()
- .addArrayField("f_array", TypeName.STRING.type())
+ .addArrayField("f_array", FieldType.STRING)
.build();
Row row = Row.withSchema(schema).addArray("one", "two", "three",
"four").build();
checkEncodeDecode(row);
@@ -98,7 +97,7 @@ public void testArrayOfRow() throws Exception {
Schema nestedSchema = Schema.builder()
.addInt32Field("f1_int")
.addStringField("f1_str").build();
- FieldType collectionElementType =
TypeName.ROW.type().withRowSchema(nestedSchema);
+ FieldType collectionElementType = FieldType.row(nestedSchema);
Schema schema = Schema.builder().addArrayField("f_array",
collectionElementType).build();
Row row = Row.withSchema(schema).addArray(
Row.withSchema(nestedSchema).addValues(1, "one").build(),
@@ -110,9 +109,7 @@ public void testArrayOfRow() throws Exception {
@Test
public void testArrayOfArray() throws Exception {
- FieldType arrayType = TypeName.ARRAY.type()
- .withCollectionElementType(TypeName.ARRAY.type()
- .withCollectionElementType(TypeName.INT32.type()));
+ FieldType arrayType = FieldType.array(FieldType.array(FieldType.INT32));
Schema schema = Schema.builder().addField("f_array", arrayType).build();
Row row = Row.withSchema(schema).addArray(
Lists.newArrayList(1, 2, 3, 4),
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
index fa8470ec89a..22fbbc3174d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
@@ -24,7 +24,6 @@
import java.util.stream.Stream;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -53,62 +52,59 @@ public void testCreate() {
assertEquals(0, schema.indexOf("f_byte"));
assertEquals("f_byte", schema.getField(0).getName());
- assertEquals(TypeName.BYTE.type(), schema.getField(0).getType());
+ assertEquals(FieldType.BYTE, schema.getField(0).getType());
assertEquals(1, schema.indexOf("f_int16"));
assertEquals("f_int16", schema.getField(1).getName());
- assertEquals(TypeName.INT16.type(), schema.getField(1).getType());
+ assertEquals(FieldType.INT16, schema.getField(1).getType());
assertEquals(2, schema.indexOf("f_int32"));
assertEquals("f_int32", schema.getField(2).getName());
- assertEquals(TypeName.INT32.type(), schema.getField(2).getType());
+ assertEquals(FieldType.INT32, schema.getField(2).getType());
assertEquals(3, schema.indexOf("f_int64"));
assertEquals("f_int64", schema.getField(3).getName());
- assertEquals(TypeName.INT64.type(), schema.getField(3).getType());
+ assertEquals(FieldType.INT64, schema.getField(3).getType());
assertEquals(4, schema.indexOf("f_decimal"));
assertEquals("f_decimal", schema.getField(4).getName());
- assertEquals(TypeName.DECIMAL.type(),
+ assertEquals(FieldType.DECIMAL,
schema.getField(4).getType());
assertEquals(5, schema.indexOf("f_float"));
assertEquals("f_float", schema.getField(5).getName());
- assertEquals(TypeName.FLOAT.type(), schema.getField(5).getType());
+ assertEquals(FieldType.FLOAT, schema.getField(5).getType());
assertEquals(6, schema.indexOf("f_double"));
assertEquals("f_double", schema.getField(6).getName());
- assertEquals(TypeName.DOUBLE.type(), schema.getField(6).getType());
+ assertEquals(FieldType.DOUBLE, schema.getField(6).getType());
assertEquals(7, schema.indexOf("f_string"));
assertEquals("f_string", schema.getField(7).getName());
- assertEquals(TypeName.STRING.type(), schema.getField(7).getType());
+ assertEquals(FieldType.STRING, schema.getField(7).getType());
assertEquals(8, schema.indexOf("f_datetime"));
assertEquals("f_datetime", schema.getField(8).getName());
- assertEquals(TypeName.DATETIME.type(),
+ assertEquals(FieldType.DATETIME,
schema.getField(8).getType());
assertEquals(9, schema.indexOf("f_boolean"));
assertEquals("f_boolean", schema.getField(9).getName());
- assertEquals(TypeName.BOOLEAN.type(), schema.getField(9).getType());
+ assertEquals(FieldType.BOOLEAN, schema.getField(9).getType());
}
@Test
public void testNestedSchema() {
- Schema nestedSchema = Schema.of(
- Field.of("f1_str", TypeName.STRING.type()));
- Schema schema = Schema.of(
- Field.of("nested", TypeName.ROW.type().withRowSchema(nestedSchema)));
+ Schema nestedSchema = Schema.of(Field.of("f1_str", FieldType.STRING));
+ Schema schema = Schema.of(Field.of("nested", FieldType.row(nestedSchema)));
Field inner =
schema.getField("nested").getType().getRowSchema().getField("f1_str");
assertEquals("f1_str", inner.getName());
- assertEquals(TypeName.STRING, inner.getType().getTypeName());
+ assertEquals(FieldType.STRING, inner.getType());
}
@Test
public void testArraySchema() {
- FieldType arrayType = TypeName.ARRAY.type()
- .withCollectionElementType(TypeName.STRING.type());
+ FieldType arrayType = FieldType.array(FieldType.STRING);
Schema schema = Schema.of(Field.of("f_array", arrayType));
Field field = schema.getField("f_array");
assertEquals("f_array", field.getName());
@@ -118,10 +114,8 @@ public void testArraySchema() {
@Test
public void testArrayOfRowSchema() {
Schema nestedSchema = Schema.of(
- Field.of("f1_str", TypeName.STRING.type()));
- FieldType arrayType = TypeName.ARRAY.type()
- .withCollectionElementType(TypeName.ROW.type()
- .withRowSchema(nestedSchema));
+ Field.of("f1_str", FieldType.STRING));
+ FieldType arrayType = FieldType.array(FieldType.row(nestedSchema));
Schema schema = Schema.of(Field.of("f_array", arrayType));
Field field = schema.getField("f_array");
assertEquals("f_array", field.getName());
@@ -130,9 +124,7 @@ public void testArrayOfRowSchema() {
@Test
public void testNestedArraySchema() {
- FieldType arrayType = TypeName.ARRAY.type()
- .withCollectionElementType(TypeName.ARRAY.type()
- .withCollectionElementType(TypeName.STRING.type()));
+ FieldType arrayType = FieldType.array(FieldType.array(FieldType.STRING));
Schema schema = Schema.of(Field.of("f_array", arrayType));
Field field = schema.getField("f_array");
assertEquals("f_array", field.getName());
@@ -141,7 +133,7 @@ public void testNestedArraySchema() {
@Test
public void testWrongName() {
- Schema schema = Schema.of(Field.of("f_byte", TypeName.BYTE.type()));
+ Schema schema = Schema.of(Field.of("f_byte", FieldType.BYTE));
thrown.expect(IllegalArgumentException.class);
schema.getField("f_string");
}
@@ -149,7 +141,7 @@ public void testWrongName() {
@Test
public void testWrongIndex() {
Schema schema = Schema.of(
- Field.of("f_byte", TypeName.BYTE.type()));
+ Field.of("f_byte", FieldType.BYTE));
thrown.expect(IndexOutOfBoundsException.class);
schema.getField(1);
}
@@ -161,15 +153,15 @@ public void testCollector() {
Schema schema =
Stream
.of(
- Schema.Field.of("f_int", TypeName.INT32.type()),
- Schema.Field.of("f_string", TypeName.STRING.type()))
+ Schema.Field.of("f_int", FieldType.INT32),
+ Schema.Field.of("f_string", FieldType.STRING))
.collect(toSchema());
assertEquals(2, schema.getFieldCount());
assertEquals("f_int", schema.getField(0).getName());
- assertEquals(TypeName.INT32, schema.getField(0).getType().getTypeName());
+ assertEquals(FieldType.INT32, schema.getField(0).getType());
assertEquals("f_string", schema.getField(1).getName());
- assertEquals(TypeName.STRING, schema.getField(1).getType().getTypeName());
+ assertEquals(FieldType.STRING, schema.getField(1).getType());
}
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
index e486328b77f..23f1079c64a 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
@@ -17,16 +17,6 @@
*/
package org.apache.beam.sdk.util;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.ARRAY;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.BOOLEAN;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.BYTE;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.DATETIME;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.DOUBLE;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.FLOAT;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.INT16;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.INT32;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.INT64;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.STRING;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.stringContainsInOrder;
@@ -37,7 +27,6 @@
import java.util.Arrays;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
import
org.apache.beam.sdk.util.RowJsonDeserializer.UnsupportedRowJsonException;
import org.apache.beam.sdk.values.Row;
import org.hamcrest.Matcher;
@@ -113,7 +102,7 @@ public void testParsesArrayField() throws Exception {
Schema
.builder()
.addInt32Field("f_int32")
- .addArrayField("f_intArray", INT32.type())
+ .addArrayField("f_intArray", FieldType.INT32)
.build();
String rowString = "{\n"
@@ -140,8 +129,7 @@ public void testParsesArrayOfArrays() throws Exception {
Schema schema =
Schema
.builder()
- .addArrayField("f_arrayOfIntArrays",
-
FieldType.of(ARRAY).withCollectionElementType(INT32.type()))
+ .addArrayField("f_arrayOfIntArrays",
FieldType.array(FieldType.INT32))
.build();
String rowString = "{\n"
@@ -170,8 +158,7 @@ public void testThrowsForMismatchedArrayField() throws
Exception {
Schema schema =
Schema
.builder()
- .addArrayField("f_arrayOfIntArrays",
-
FieldType.of(ARRAY).withCollectionElementType(INT32.type()))
+ .addArrayField("f_arrayOfIntArrays",
FieldType.array(FieldType.INT32))
.build();
String rowString = "{\n"
@@ -321,7 +308,7 @@ public void testThrowsForUnsupportedArrayElementType()
throws Exception {
Schema schema =
Schema
.builder()
- .addArrayField("f_dateTimeArray", DATETIME.type())
+ .addArrayField("f_dateTimeArray", FieldType.DATETIME)
.build();
thrown.expect(UnsupportedRowJsonException.class);
@@ -335,7 +322,7 @@ public void testThrowsForUnsupportedNestedFieldType()
throws Exception {
Schema nestedSchema =
Schema
.builder()
- .addArrayField("f_dateTimeArray", DATETIME.type())
+ .addArrayField("f_dateTimeArray", FieldType.DATETIME)
.build();
Schema schema =
@@ -400,59 +387,59 @@ public void testThrowsForMissingNotNullableField() throws
Exception {
@Test
public void testSupportedBooleanConversions() throws Exception {
- testSupportedConversion(BOOLEAN, BOOLEAN_TRUE_STRING, BOOLEAN_TRUE_VALUE);
+ testSupportedConversion(FieldType.BOOLEAN, BOOLEAN_TRUE_STRING,
BOOLEAN_TRUE_VALUE);
}
@Test
public void testSupportedStringConversions() throws Exception {
- testSupportedConversion(STRING, quoted(FLOAT_STRING), FLOAT_STRING);
+ testSupportedConversion(FieldType.STRING, quoted(FLOAT_STRING),
FLOAT_STRING);
}
@Test
public void testSupportedByteConversions() throws Exception {
- testSupportedConversion(BYTE, BYTE_STRING, BYTE_VALUE);
+ testSupportedConversion(FieldType.BYTE, BYTE_STRING, BYTE_VALUE);
}
@Test
public void testSupportedShortConversions() throws Exception {
- testSupportedConversion(INT16, BYTE_STRING, (short) BYTE_VALUE);
- testSupportedConversion(INT16, SHORT_STRING, SHORT_VALUE);
+ testSupportedConversion(FieldType.INT16, BYTE_STRING, (short) BYTE_VALUE);
+ testSupportedConversion(FieldType.INT16, SHORT_STRING, SHORT_VALUE);
}
@Test
public void testSupportedIntConversions() throws Exception {
- testSupportedConversion(INT32, BYTE_STRING, (int) BYTE_VALUE);
- testSupportedConversion(INT32, SHORT_STRING, (int) SHORT_VALUE);
- testSupportedConversion(INT32, INT_STRING, INT_VALUE);
+ testSupportedConversion(FieldType.INT32, BYTE_STRING, (int) BYTE_VALUE);
+ testSupportedConversion(FieldType.INT32, SHORT_STRING, (int) SHORT_VALUE);
+ testSupportedConversion(FieldType.INT32, INT_STRING, INT_VALUE);
}
@Test
public void testSupportedLongConversions() throws Exception {
- testSupportedConversion(INT64, BYTE_STRING, (long) BYTE_VALUE);
- testSupportedConversion(INT64, SHORT_STRING, (long) SHORT_VALUE);
- testSupportedConversion(INT64, INT_STRING, (long) INT_VALUE);
- testSupportedConversion(INT64, LONG_STRING, LONG_VALUE);
+ testSupportedConversion(FieldType.INT64, BYTE_STRING, (long) BYTE_VALUE);
+ testSupportedConversion(FieldType.INT64, SHORT_STRING, (long) SHORT_VALUE);
+ testSupportedConversion(FieldType.INT64, INT_STRING, (long) INT_VALUE);
+ testSupportedConversion(FieldType.INT64, LONG_STRING, LONG_VALUE);
}
@Test
public void testSupportedFloatConversions() throws Exception {
- testSupportedConversion(FLOAT, FLOAT_STRING, FLOAT_VALUE);
- testSupportedConversion(FLOAT, SHORT_STRING, (float) SHORT_VALUE);
+ testSupportedConversion(FieldType.FLOAT, FLOAT_STRING, FLOAT_VALUE);
+ testSupportedConversion(FieldType.FLOAT, SHORT_STRING, (float)
SHORT_VALUE);
}
@Test
public void testSupportedDoubleConversions() throws Exception {
- testSupportedConversion(DOUBLE, DOUBLE_STRING, DOUBLE_VALUE);
- testSupportedConversion(DOUBLE, FLOAT_STRING, (double) FLOAT_VALUE);
- testSupportedConversion(DOUBLE, INT_STRING, (double) INT_VALUE);
+ testSupportedConversion(FieldType.DOUBLE, DOUBLE_STRING, DOUBLE_VALUE);
+ testSupportedConversion(FieldType.DOUBLE, FLOAT_STRING, (double)
FLOAT_VALUE);
+ testSupportedConversion(FieldType.DOUBLE, INT_STRING, (double) INT_VALUE);
}
private void testSupportedConversion(
- TypeName fieldType,
+ FieldType fieldType,
String jsonFieldValue,
Object expectedRowFieldValue) throws Exception {
- String fieldName = "f_" + fieldType.name().toLowerCase();
+ String fieldName = "f_" + fieldType.getTypeName().name().toLowerCase();
Schema schema = schemaWithField(fieldName, fieldType);
Row expectedRow =
Row.withSchema(schema).addValues(expectedRowFieldValue).build();
ObjectMapper jsonParser =
newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
@@ -464,84 +451,84 @@ private void testSupportedConversion(
@Test
public void testUnsupportedBooleanConversions() throws Exception {
- testUnsupportedConversion(BOOLEAN, quoted(BOOLEAN_TRUE_STRING));
- testUnsupportedConversion(BOOLEAN, BYTE_STRING);
- testUnsupportedConversion(BOOLEAN, SHORT_STRING);
- testUnsupportedConversion(BOOLEAN, INT_STRING);
- testUnsupportedConversion(BOOLEAN, LONG_STRING);
- testUnsupportedConversion(BOOLEAN, FLOAT_STRING);
- testUnsupportedConversion(BOOLEAN, DOUBLE_STRING);
+ testUnsupportedConversion(FieldType.BOOLEAN, quoted(BOOLEAN_TRUE_STRING));
+ testUnsupportedConversion(FieldType.BOOLEAN, BYTE_STRING);
+ testUnsupportedConversion(FieldType.BOOLEAN, SHORT_STRING);
+ testUnsupportedConversion(FieldType.BOOLEAN, INT_STRING);
+ testUnsupportedConversion(FieldType.BOOLEAN, LONG_STRING);
+ testUnsupportedConversion(FieldType.BOOLEAN, FLOAT_STRING);
+ testUnsupportedConversion(FieldType.BOOLEAN, DOUBLE_STRING);
}
@Test
public void testUnsupportedStringConversions() throws Exception {
- testUnsupportedConversion(STRING, BOOLEAN_TRUE_STRING);
- testUnsupportedConversion(STRING, BYTE_STRING);
- testUnsupportedConversion(STRING, SHORT_STRING);
- testUnsupportedConversion(STRING, INT_STRING);
- testUnsupportedConversion(STRING, LONG_STRING);
- testUnsupportedConversion(STRING, FLOAT_STRING);
- testUnsupportedConversion(STRING, DOUBLE_STRING);
+ testUnsupportedConversion(FieldType.STRING, BOOLEAN_TRUE_STRING);
+ testUnsupportedConversion(FieldType.STRING, BYTE_STRING);
+ testUnsupportedConversion(FieldType.STRING, SHORT_STRING);
+ testUnsupportedConversion(FieldType.STRING, INT_STRING);
+ testUnsupportedConversion(FieldType.STRING, LONG_STRING);
+ testUnsupportedConversion(FieldType.STRING, FLOAT_STRING);
+ testUnsupportedConversion(FieldType.STRING, DOUBLE_STRING);
}
@Test
public void testUnsupportedByteConversions() throws Exception {
- testUnsupportedConversion(BYTE, BOOLEAN_TRUE_STRING);
- testUnsupportedConversion(BYTE, quoted(BYTE_STRING));
- testUnsupportedConversion(BYTE, SHORT_STRING);
- testUnsupportedConversion(BYTE, INT_STRING);
- testUnsupportedConversion(BYTE, LONG_STRING);
- testUnsupportedConversion(BYTE, FLOAT_STRING);
- testUnsupportedConversion(BYTE, DOUBLE_STRING);
+ testUnsupportedConversion(FieldType.BYTE, BOOLEAN_TRUE_STRING);
+ testUnsupportedConversion(FieldType.BYTE, quoted(BYTE_STRING));
+ testUnsupportedConversion(FieldType.BYTE, SHORT_STRING);
+ testUnsupportedConversion(FieldType.BYTE, INT_STRING);
+ testUnsupportedConversion(FieldType.BYTE, LONG_STRING);
+ testUnsupportedConversion(FieldType.BYTE, FLOAT_STRING);
+ testUnsupportedConversion(FieldType.BYTE, DOUBLE_STRING);
}
@Test
public void testUnsupportedShortConversions() throws Exception {
- testUnsupportedConversion(INT16, BOOLEAN_TRUE_STRING);
- testUnsupportedConversion(INT16, quoted(SHORT_STRING));
- testUnsupportedConversion(INT16, INT_STRING);
- testUnsupportedConversion(INT16, LONG_STRING);
- testUnsupportedConversion(INT16, FLOAT_STRING);
- testUnsupportedConversion(INT16, DOUBLE_STRING);
+ testUnsupportedConversion(FieldType.INT16, BOOLEAN_TRUE_STRING);
+ testUnsupportedConversion(FieldType.INT16, quoted(SHORT_STRING));
+ testUnsupportedConversion(FieldType.INT16, INT_STRING);
+ testUnsupportedConversion(FieldType.INT16, LONG_STRING);
+ testUnsupportedConversion(FieldType.INT16, FLOAT_STRING);
+ testUnsupportedConversion(FieldType.INT16, DOUBLE_STRING);
}
@Test
public void testUnsupportedIntConversions() throws Exception {
- testUnsupportedConversion(INT32, quoted(INT_STRING));
- testUnsupportedConversion(INT32, BOOLEAN_TRUE_STRING);
- testUnsupportedConversion(INT32, LONG_STRING);
- testUnsupportedConversion(INT32, FLOAT_STRING);
- testUnsupportedConversion(INT32, DOUBLE_STRING);
+ testUnsupportedConversion(FieldType.INT32, quoted(INT_STRING));
+ testUnsupportedConversion(FieldType.INT32, BOOLEAN_TRUE_STRING);
+ testUnsupportedConversion(FieldType.INT32, LONG_STRING);
+ testUnsupportedConversion(FieldType.INT32, FLOAT_STRING);
+ testUnsupportedConversion(FieldType.INT32, DOUBLE_STRING);
}
@Test
public void testUnsupportedLongConversions() throws Exception {
- testUnsupportedConversion(INT64, quoted(LONG_STRING));
- testUnsupportedConversion(INT64, BOOLEAN_TRUE_STRING);
- testUnsupportedConversion(INT64, FLOAT_STRING);
- testUnsupportedConversion(INT64, DOUBLE_STRING);
+ testUnsupportedConversion(FieldType.INT64, quoted(LONG_STRING));
+ testUnsupportedConversion(FieldType.INT64, BOOLEAN_TRUE_STRING);
+ testUnsupportedConversion(FieldType.INT64, FLOAT_STRING);
+ testUnsupportedConversion(FieldType.INT64, DOUBLE_STRING);
}
@Test
public void testUnsupportedFloatConversions() throws Exception {
- testUnsupportedConversion(FLOAT, quoted(FLOAT_STRING));
- testUnsupportedConversion(FLOAT, BOOLEAN_TRUE_STRING);
- testUnsupportedConversion(FLOAT, DOUBLE_STRING);
- testUnsupportedConversion(FLOAT, INT_STRING); // too large to fit
+ testUnsupportedConversion(FieldType.FLOAT, quoted(FLOAT_STRING));
+ testUnsupportedConversion(FieldType.FLOAT, BOOLEAN_TRUE_STRING);
+ testUnsupportedConversion(FieldType.FLOAT, DOUBLE_STRING);
+ testUnsupportedConversion(FieldType.FLOAT, INT_STRING); // too large to fit
}
@Test
public void testUnsupportedDoubleConversions() throws Exception {
- testUnsupportedConversion(DOUBLE, quoted(DOUBLE_STRING));
- testUnsupportedConversion(DOUBLE, BOOLEAN_TRUE_STRING);
- testUnsupportedConversion(DOUBLE, LONG_STRING); // too large to fit
+ testUnsupportedConversion(FieldType.DOUBLE, quoted(DOUBLE_STRING));
+ testUnsupportedConversion(FieldType.DOUBLE, BOOLEAN_TRUE_STRING);
+ testUnsupportedConversion(FieldType.DOUBLE, LONG_STRING); // too large to
fit
}
private void testUnsupportedConversion(
- TypeName fieldType,
+ FieldType fieldType,
String jsonFieldValue) throws Exception {
- String fieldName = "f_" + fieldType.name().toLowerCase();
+ String fieldName = "f_" + fieldType.getTypeName().name().toLowerCase();
ObjectMapper jsonParser =
newObjectMapperWith(RowJsonDeserializer
@@ -558,11 +545,11 @@ private String quoted(String string) {
return "\"" + string + "\"";
}
- private Schema schemaWithField(String fieldName, TypeName fieldType) {
+ private Schema schemaWithField(String fieldName, FieldType fieldType) {
return
Schema
.builder()
- .addField(fieldName, fieldType.type())
+ .addField(fieldName, fieldType)
.build();
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
index 25c84230fa9..b411cbeaa77 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
@@ -32,7 +32,7 @@
import java.util.Map;
import java.util.stream.Stream;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Rule;
@@ -52,11 +52,11 @@ public void testCreatesNullRecord() {
Schema type =
Stream
.of(
- Schema.Field.of("f_int", TypeName.INT32.type())
+ Schema.Field.of("f_int", FieldType.INT32)
.withNullable(true),
- Schema.Field.of("f_str", TypeName.STRING.type())
+ Schema.Field.of("f_str", FieldType.STRING)
.withNullable(true),
- Schema.Field.of("f_double", TypeName.DOUBLE.type())
+ Schema.Field.of("f_double", FieldType.DOUBLE)
.withNullable(true))
.collect(toSchema());
@@ -69,7 +69,7 @@ public void testCreatesNullRecord() {
@Test
public void testRejectsNullRecord() {
- Schema type = Stream.of(Schema.Field.of("f_int", TypeName.INT32.type()))
+ Schema type = Stream.of(Schema.Field.of("f_int", Schema.FieldType.INT32))
.collect(toSchema());
thrown.expect(IllegalArgumentException.class);
Row.nullRow(type);
@@ -116,15 +116,15 @@ public void testCreatesRecord() {
@Test
public void testCreatesNestedRow() {
Schema nestedType = Stream.of(
- Schema.Field.of("f1_str", TypeName.STRING.type()))
+ Schema.Field.of("f1_str", Schema.FieldType.STRING))
.collect(toSchema());
Schema type =
Stream
- .of(Schema.Field.of("f_int", TypeName.INT32.type()),
+ .of(Schema.Field.of("f_int", Schema.FieldType.INT32),
Schema.Field.of("nested",
- TypeName.ROW.type()
- .withRowSchema(nestedType)))
+ Schema.FieldType.row(
+ nestedType)))
.collect(toSchema());
Row nestedRow = Row.withSchema(nestedType).addValues("foobar").build();
Row row = Row.withSchema(type).addValues(42, nestedRow).build();
@@ -137,8 +137,7 @@ public void testCreatesArray() {
List<Integer> data = Lists.newArrayList(2, 3, 5, 7);
Schema type = Stream
.of(Schema.Field.of("array",
- TypeName.ARRAY.type()
- .withCollectionElementType(TypeName.INT32.type())))
+ Schema.FieldType.array(Schema.FieldType.INT32)))
.collect(toSchema());
Row row = Row.withSchema(type).addArray(data).build();
assertEquals(data, row.getArray("array"));
@@ -147,7 +146,7 @@ public void testCreatesArray() {
@Test
public void testCreatesRowArray() {
Schema nestedType = Stream.of(
- Schema.Field.of("f1_str", TypeName.STRING.type()))
+ Schema.Field.of("f1_str", FieldType.STRING))
.collect(toSchema());
List<Row> data = Lists.newArrayList(
Row.withSchema(nestedType).addValues("one").build(),
@@ -156,9 +155,7 @@ public void testCreatesRowArray() {
Schema type = Stream
.of(Schema.Field.of("array",
- TypeName.ARRAY.type()
- .withCollectionElementType(TypeName.ROW.type()
- .withRowSchema(nestedType))))
+ FieldType.array(FieldType.row(nestedType))))
.collect(toSchema());
Row row = Row.withSchema(type).addArray(data).build();
assertEquals(data, row.getArray("array"));
@@ -170,9 +167,7 @@ public void testCreatesArrayArray() {
Lists.newArrayList(1, 2, 3, 4));
Schema type = Stream
.of(Schema.Field.of("array",
- TypeName.ARRAY.type()
- .withCollectionElementType(TypeName.ARRAY.type()
- .withCollectionElementType(TypeName.INT32.type()))))
+ FieldType.array(FieldType.array(FieldType.INT32))))
.collect(toSchema());
Row row = Row.withSchema(type).addArray(data).build();
assertEquals(data, row.getArray("array"));
@@ -186,8 +181,7 @@ public void testCreatesArrayOfMap() {
.build();
Schema type = Stream
.of(Schema.Field.of("array",
- TypeName.ARRAY.type().withCollectionElementType(
- TypeName.MAP.type().withMapType(TypeName.INT32.type(),
TypeName.STRING.type()))))
+ FieldType.array(FieldType.map(FieldType.INT32, FieldType.STRING))))
.collect(toSchema());
Row row = Row.withSchema(type).addArray(data).build();
assertEquals(data, row.getArray("array"));
@@ -203,7 +197,7 @@ public void testCreateMapWithPrimitiveValue() {
.build();
Schema type = Stream
.of(Schema.Field.of("map",
- TypeName.MAP.type().withMapType(TypeName.INT32.type(),
TypeName.STRING.type())))
+ FieldType.map(FieldType.INT32, FieldType.STRING)))
.collect(toSchema());
Row row = Row.withSchema(type).addValue(data).build();
assertEquals(data, row.getMap("map"));
@@ -217,8 +211,8 @@ public void testCreateMapWithArrayValue() {
.build();
Schema type = Stream
.of(Schema.Field.of("map",
- TypeName.MAP.type().withMapType(TypeName.INT32.type(),
-
TypeName.ARRAY.type().withCollectionElementType(TypeName.STRING.type()))))
+ FieldType.map(FieldType.INT32,
+ FieldType.array(FieldType.STRING))))
.collect(toSchema());
Row row = Row.withSchema(type).addValue(data).build();
assertEquals(data, row.getMap("map"));
@@ -232,8 +226,7 @@ public void testCreateMapWithMapValue() {
.build();
Schema type = Stream
.of(Schema.Field.of("map",
- TypeName.MAP.type().withMapType(TypeName.INT32.type(),
- TypeName.MAP.type().withMapType(TypeName.INT32.type(),
TypeName.STRING.type()))))
+ FieldType.map(FieldType.INT32, FieldType.map(FieldType.INT32,
FieldType.STRING))))
.collect(toSchema());
Row row = Row.withSchema(type).addValue(data).build();
assertEquals(data, row.getMap("map"));
@@ -241,15 +234,15 @@ public void testCreateMapWithMapValue() {
@Test
public void testCreateMapWithRowValue() {
- Schema nestedType = Stream.of(Schema.Field.of("f1_str",
TypeName.STRING.type()))
+ Schema nestedType = Stream.of(Schema.Field.of("f1_str", FieldType.STRING))
.collect(toSchema());
Map<Integer, Row> data = ImmutableMap.<Integer, Row>builder()
.put(1, Row.withSchema(nestedType).addValues("one").build())
.put(2, Row.withSchema(nestedType).addValues("two").build())
.build();
- Schema type = Stream.of(Schema.Field.of("map", TypeName.MAP.type()
- .withMapType(TypeName.INT32.type(),
- TypeName.ROW.type().withRowSchema(nestedType)))).collect(toSchema());
+ Schema type = Stream.of(Schema.Field.of("map", FieldType.map(
+ FieldType.INT32,
+ FieldType.row(nestedType)))).collect(toSchema());
Row row = Row.withSchema(type).addValue(data).build();
assertEquals(data, row.getMap("map"));
}
@@ -259,9 +252,9 @@ public void testCollector() {
Schema type =
Stream
.of(
- Schema.Field.of("f_int", TypeName.INT32.type()),
- Schema.Field.of("f_str", TypeName.STRING.type()),
- Schema.Field.of("f_double", TypeName.DOUBLE.type()))
+ Schema.Field.of("f_int", FieldType.INT32),
+ Schema.Field.of("f_str", FieldType.STRING),
+ Schema.Field.of("f_double", FieldType.DOUBLE))
.collect(toSchema());
Row row =
@@ -279,9 +272,9 @@ public void testThrowsForIncorrectNumberOfFields() {
Schema type =
Stream
.of(
- Schema.Field.of("f_int", TypeName.INT32.type()),
- Schema.Field.of("f_str", TypeName.STRING.type()),
- Schema.Field.of("f_double", TypeName.DOUBLE.type()))
+ Schema.Field.of("f_int", FieldType.INT32),
+ Schema.Field.of("f_str", FieldType.STRING),
+ Schema.Field.of("f_double", FieldType.DOUBLE))
.collect(toSchema());
thrown.expect(IllegalArgumentException.class);
diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
index be6f5864fc6..c7f1b014304 100644
--- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
+++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
@@ -218,8 +218,7 @@ Schema.FieldType Array() :
{
<ARRAY> <LT> arrayElementType = FieldType() <GT>
{
- return Schema.TypeName.ARRAY.type()
- .withCollectionElementType(arrayElementType);
+ return Schema.FieldType.array(arrayElementType);
}
}
@@ -237,8 +236,7 @@ Schema.FieldType Map() :
mapValueType = FieldType()
<GT>
{
- return Schema.TypeName.MAP.type()
- .withMapType(mapKeyType, mapValueType);
+ return Schema.FieldType.map(mapKeyType, mapValueType);
}
}
@@ -250,8 +248,7 @@ Schema.FieldType Row() :
<ROW> fields = RowFields()
{
Schema rowSchema = Schema.builder().addFields(fields).build();
- return Schema.TypeName.ROW.type()
- .withRowSchema(rowSchema);
+ return Schema.FieldType.row(rowSchema);
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 74d3007e15c..9321d02f13a 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -27,7 +27,6 @@
import java.util.stream.IntStream;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -99,15 +98,22 @@ public static Schema toBeamSchema(RelDataType tableInfo) {
}
public static SqlTypeName toSqlTypeName(FieldType type) {
- SqlTypeName typeName =
- BEAM_TO_CALCITE_TYPE_MAPPING.get(
-
type.withCollectionElementType(null).withRowSchema(null).withMapType(null,
null));
- if (typeName != null) {
- return typeName;
- } else {
- // This will happen e.g. if looking up a STRING type, and metadata isn't
set to say which
- // type of SQL string we want. In this case, use the default mapping.
- return BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type);
+ switch (type.getTypeName()) {
+ case ROW:
+ return SqlTypeName.ROW;
+ case ARRAY:
+ return SqlTypeName.ARRAY;
+ case MAP:
+ return SqlTypeName.MAP;
+ default:
+ SqlTypeName typeName = BEAM_TO_CALCITE_TYPE_MAPPING.get(type);
+ if (typeName != null) {
+ return typeName;
+ } else {
+ // This will happen e.g. if looking up a STRING type, and metadata
isn't set to say which
+ // type of SQL string we want. In this case, use the default mapping.
+ return BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type);
+ }
}
}
@@ -123,7 +129,7 @@ public static FieldType toFieldType(SqlTypeName
sqlTypeName) {
+ "so it cannot be converted to a %s",
sqlTypeName, Schema.FieldType.class.getSimpleName()));
default:
- return
CALCITE_TO_BEAM_TYPE_MAPPING.get(sqlTypeName).getTypeName().type();
+ return
CALCITE_TO_BEAM_TYPE_MAPPING.get(sqlTypeName).withMetadata((byte[]) null);
}
}
@@ -143,26 +149,6 @@ public static FieldType toFieldType(RelDataType
calciteType) {
}
}
- public static FieldType toArrayType(SqlTypeName collectionElementType) {
- return
TypeName.ARRAY.type().withCollectionElementType(toFieldType(collectionElementType));
- }
-
- public static FieldType toArrayType(RelDataType collectionElementType) {
- return
TypeName.ARRAY.type().withCollectionElementType(toFieldType(collectionElementType));
- }
-
- public static FieldType toMapType(SqlTypeName componentKeyType, SqlTypeName
componentValueType) {
- return TypeName.MAP
- .type()
- .withMapType(toFieldType(componentKeyType),
toFieldType(componentValueType));
- }
-
- public static FieldType toMapType(RelDataType componentKeyType, RelDataType
componentValueType) {
- return TypeName.MAP
- .type()
- .withMapType(toFieldType(componentKeyType),
toFieldType(componentValueType));
- }
-
public static Schema.Field toBeamSchemaField(RelDataTypeField calciteField) {
FieldType fieldType = toFieldType(calciteField.getType());
// TODO: We should support Calcite's nullable annotations.
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
index cb320e6d0b5..a49a9915697 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
@@ -22,7 +22,6 @@
import static
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
import static
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
import static
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.TIMESTAMP_FIELD;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.MAP;
import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW;
import com.alibaba.fastjson.JSONObject;
@@ -70,7 +69,7 @@ private void validatePubsubMessageSchema(Table
tableDefinition) {
if (schema.getFieldCount() != 3
|| !fieldPresent(schema, TIMESTAMP_FIELD, TIMESTAMP)
- || !fieldPresent(schema, ATTRIBUTES_FIELD,
MAP.type().withMapType(VARCHAR, VARCHAR))
+ || !fieldPresent(schema, ATTRIBUTES_FIELD,
Schema.FieldType.map(VARCHAR, VARCHAR))
|| !(schema.hasField(PAYLOAD_FIELD)
&&
ROW.equals(schema.getField(PAYLOAD_FIELD).getType().getTypeName()))) {
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index d218b73e8b7..b98a76b4441 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -20,9 +20,6 @@
import static
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.BOOLEAN;
import static
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.INTEGER;
import static
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARCHAR;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.ARRAY;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.MAP;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW;
import static org.apache.beam.sdk.schemas.Schema.toSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -86,14 +83,8 @@ public void testExecute_createTableWithPrefixArrayField()
throws Exception {
Field.of("id",
INTEGER).withDescription("id").withNullable(true),
Field.of("name",
VARCHAR).withDescription("name").withNullable(true),
Field.of("age",
INTEGER).withDescription("age").withNullable(true),
- Field.of("tags",
ARRAY.type().withCollectionElementType(VARCHAR))
- .withNullable(true),
- Field.of(
- "matrix",
- ARRAY
- .type()
- .withCollectionElementType(
-
ARRAY.type().withCollectionElementType(INTEGER)))
+ Field.of("tags",
Schema.FieldType.array(VARCHAR)).withNullable(true),
+ Field.of("matrix",
Schema.FieldType.array(Schema.FieldType.array(INTEGER)))
.withNullable(true))
.collect(toSchema()),
table.getSchema());
@@ -122,10 +113,10 @@ public void testExecute_createTableWithPrefixMapField()
throws Exception {
Field.of("id",
INTEGER).withDescription("id").withNullable(true),
Field.of("name",
VARCHAR).withDescription("name").withNullable(true),
Field.of("age",
INTEGER).withDescription("age").withNullable(true),
- Field.of("tags", MAP.type().withMapType(VARCHAR,
VARCHAR)).withNullable(true),
+ Field.of("tags", Schema.FieldType.map(VARCHAR,
VARCHAR)).withNullable(true),
Field.of(
"nestedmap",
- MAP.type().withMapType(INTEGER,
MAP.type().withMapType(VARCHAR, INTEGER)))
+ Schema.FieldType.map(INTEGER,
Schema.FieldType.map(VARCHAR, INTEGER)))
.withNullable(true))
.collect(toSchema()),
table.getSchema());
@@ -163,21 +154,19 @@ public void testExecute_createTableWithRowField() throws
Exception {
Field.of("age",
INTEGER).withDescription("age").withNullable(true),
Field.of(
"address",
- ROW.type()
- .withRowSchema(
- Schema.builder()
- .addNullableField("street",
Schema.FieldType.STRING)
- .addNullableField("country",
Schema.FieldType.STRING)
- .build()))
+ Schema.FieldType.row(
+ Schema.builder()
+ .addNullableField("street",
Schema.FieldType.STRING)
+ .addNullableField("country",
Schema.FieldType.STRING)
+ .build()))
.withNullable(true),
Field.of(
"addressangular",
- ROW.type()
- .withRowSchema(
- Schema.builder()
- .addNullableField("street",
Schema.FieldType.STRING)
- .addNullableField("country",
Schema.FieldType.STRING)
- .build()))
+ Schema.FieldType.row(
+ Schema.builder()
+ .addNullableField("street",
Schema.FieldType.STRING)
+ .addNullableField("country",
Schema.FieldType.STRING)
+ .build()))
.withNullable(true),
Field.of("isrobot", BOOLEAN).withNullable(true))
.collect(toSchema()),
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
index ca5409fecf3..52714e57bb4 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
@@ -29,7 +29,6 @@
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.junit.Test;
@@ -147,7 +146,7 @@ private static Table mockTable(
.location(location)
.schema(
Stream.of(
- Schema.Field.of("id", TypeName.INT32.type())
+ Schema.Field.of("id", CalciteUtils.INTEGER)
.withNullable(true)
.withDescription("id"),
Schema.Field.of("name", CalciteUtils.VARCHAR)
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
index b5ae44a6936..3aa9089759f 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
@@ -26,7 +26,6 @@
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.junit.Test;
/** UnitTest for {@link BigQueryTableProvider}. */
@@ -57,7 +56,7 @@ private static Table fakeTable(String name) {
.location("project:dataset.table")
.schema(
Stream.of(
- Schema.Field.nullable("id", TypeName.INT32.type()),
+ Schema.Field.nullable("id", Schema.FieldType.INT32),
Schema.Field.nullable("name", Schema.FieldType.STRING))
.collect(toSchema()))
.type("bigquery")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
index 6d32744a8bc..66a5bee57c4 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
@@ -29,7 +29,6 @@
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.junit.Test;
/** UnitTest for {@link KafkaTableProvider}. */
@@ -68,7 +67,7 @@ private static Table mockTable(String name) {
.location("kafka://localhost:2181/brokers?topic=test")
.schema(
Stream.of(
- Schema.Field.nullable("id", TypeName.INT32.type()),
+ Schema.Field.nullable("id", Schema.FieldType.INT32),
Schema.Field.nullable("name", Schema.FieldType.STRING))
.collect(toSchema()))
.type("kafka")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
index d4dbca55808..9211297b4f3 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
@@ -27,7 +27,6 @@
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.commons.csv.CSVFormat;
import org.junit.Test;
@@ -76,7 +75,7 @@ private static Table mockTable(String name, String format) {
.location("/home/admin/" + name)
.schema(
Stream.of(
- Schema.Field.nullable("id", TypeName.INT32.type()),
+ Schema.Field.nullable("id", Schema.FieldType.INT32),
Schema.Field.nullable("name", Schema.FieldType.STRING))
.collect(toSchema()))
.type("text")
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
index 90f93fec2e7..e2bbf264fa0 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
@@ -31,7 +31,6 @@
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
@@ -122,7 +121,7 @@ private static Table mockTable(String name, String type) {
.location("/home/admin/" + name)
.schema(
Stream.of(
- Schema.Field.nullable("id", TypeName.INT32.type()),
+ Schema.Field.nullable("id", Schema.FieldType.INT32),
Schema.Field.nullable("name", Schema.FieldType.STRING))
.collect(toSchema()))
.type(type)
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/QuickCheckGenerators.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/QuickCheckGenerators.java
index 28ee7ae5adb..bc7a1989636 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/QuickCheckGenerators.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/QuickCheckGenerators.java
@@ -46,7 +46,7 @@
private static final List<FieldType> PRIMITIVE_TYPES =
java.util.Arrays.stream(TypeName.values())
.filter(TypeName::isPrimitiveType)
- .map(TypeName::type)
+ .map(FieldType::of)
.collect(toList());
@Override
@@ -59,7 +59,7 @@ public FieldType generateFieldType(SourceOfRandomness random,
GenerationStatus s
public static class Arrays extends FieldTypeGenerator {
@Override
public FieldType generateFieldType(SourceOfRandomness random,
GenerationStatus status) {
- return
TypeName.ARRAY.type().withCollectionElementType(ANY_TYPE.generate(random,
status));
+ return FieldType.array(ANY_TYPE.generate(random, status));
}
}
@@ -69,9 +69,8 @@ public FieldType generateFieldType(SourceOfRandomness random,
GenerationStatus s
public static class Maps extends FieldTypeGenerator {
@Override
public FieldType generateFieldType(SourceOfRandomness random,
GenerationStatus status) {
- return TypeName.MAP
- .type()
- .withMapType(PRIMITIVE_TYPES.generate(random, status),
ANY_TYPE.generate(random, status));
+ return FieldType.map(
+ PRIMITIVE_TYPES.generate(random, status), ANY_TYPE.generate(random,
status));
}
}
@@ -83,9 +82,7 @@ public FieldType generateFieldType(SourceOfRandomness random,
GenerationStatus s
FieldTypeGenerator rowFieldTypesGenerator =
(nestingLevel(status) >= 10) ? PRIMITIVE_TYPES : ANY_TYPE;
- return TypeName.ROW
- .type()
- .withRowSchema(generateSchema(rowFieldTypesGenerator, random,
status));
+ return FieldType.row(generateSchema(rowFieldTypesGenerator, random,
status));
}
private Schema generateSchema(
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 888d7af63a7..0f4804f4d38 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -52,7 +52,7 @@
private static final Schema ARRAY_TYPE = Schema
.builder()
- .addArrayField("ids", Schema.TypeName.INT64.type())
+ .addArrayField("ids", Schema.FieldType.INT64)
.build();
private static final Schema ROW_TYPE = Schema
@@ -62,7 +62,7 @@
private static final Schema ARRAY_ROW_TYPE =
Schema.builder()
- .addArrayField("rows",
Schema.FieldType.of(Schema.TypeName.ROW).withRowSchema(FLAT_TYPE))
+ .addArrayField("rows", Schema.FieldType.row(FLAT_TYPE))
.build();
private static final TableFieldSchema ID =
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 107512)
Time Spent: 0.5h (was: 20m)
> Schema followups
> ----------------
>
> Key: BEAM-4076
> URL: https://issues.apache.org/jira/browse/BEAM-4076
> Project: Beam
> Issue Type: Improvement
> Components: beam-model, dsl-sql, sdk-java-core
> Reporter: Kenneth Knowles
> Assignee: Kenneth Knowles
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> This umbrella bug contains subtasks with followups for Beam schemas, which
> were moved from SQL to the core Java SDK and made to be type-name-based
> rather than coder based.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)