http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java new file mode 100644 index 0000000..a6b965d --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.util.Date; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +public class MapRecord implements Record { + private final RecordSchema schema; + private final Map<String, Object> values; + + public MapRecord(final RecordSchema schema, final Map<String, Object> values) { + this.schema = Objects.requireNonNull(schema); + this.values = Objects.requireNonNull(values); + } + + @Override + public RecordSchema getSchema() { + return schema; + } + + @Override + public Object[] getValues() { + final Object[] values = new Object[schema.getFieldCount()]; + int i = 0; + for (final RecordField recordField : schema.getFields()) { + values[i++] = getValue(recordField); + } + return values; + } + + @Override + public Object getValue(final String fieldName) { + final Optional<RecordField> fieldOption = schema.getField(fieldName); + if (fieldOption.isPresent()) { + return getValue(fieldOption.get()); + } + + return null; + } + + @Override + public Object getValue(final RecordField field) { + Object explicitValue = getExplicitValue(field); + if (explicitValue != null) { + return explicitValue; + } + + final Optional<RecordField> resolvedField = resolveField(field); + final boolean resolvedFieldDifferent = resolvedField.isPresent() && !resolvedField.get().equals(field); + if (resolvedFieldDifferent) { + explicitValue = getExplicitValue(resolvedField.get()); + if (explicitValue != null) { + return explicitValue; + } + } + + Object defaultValue = field.getDefaultValue(); + if (defaultValue != null) { + return defaultValue; + } + + if (resolvedFieldDifferent) { + return resolvedField.get().getDefaultValue(); + } + + return null; + } + + private Optional<RecordField> resolveField(final RecordField field) { + Optional<RecordField> resolved = schema.getField(field.getFieldName()); + if (resolved.isPresent()) { + return resolved; + } + + for (final String alias : field.getAliases()) { + resolved = schema.getField(alias); + if (resolved.isPresent()) { + return resolved; + } + } + + return Optional.empty(); + } + + private Object getExplicitValue(final RecordField field) { + final String canonicalFieldName = field.getFieldName(); + + // We use containsKey here instead of just calling get() and checking for a null value + // because if the true field name is set to null, we want to return null, rather than + // what the alias points to. Likewise for a specific alias, since aliases are defined + // in a List with a specific ordering. + Object value = values.get(canonicalFieldName); + if (value != null) { + return value; + } + + for (final String alias : field.getAliases()) { + value = values.get(alias); + if (value != null) { + return value; + } + } + + return null; + } + + @Override + public String getAsString(final String fieldName) { + final Optional<DataType> dataTypeOption = schema.getDataType(fieldName); + if (!dataTypeOption.isPresent()) { + return null; + } + + return convertToString(getValue(fieldName), dataTypeOption.get().getFormat()); + } + + @Override + public String getAsString(final String fieldName, final String format) { + return convertToString(getValue(fieldName), format); + } + + @Override + public String getAsString(final RecordField field, final String format) { + return convertToString(getValue(field), format); + } + + private String getFormat(final String optionalFormat, final RecordFieldType fieldType) { + return (optionalFormat == null) ? fieldType.getDefaultFormat() : optionalFormat; + } + + private String convertToString(final Object value, final String format) { + if (value == null) { + return null; + } + + final String dateFormat = getFormat(format, RecordFieldType.DATE); + final String timestampFormat = getFormat(format, RecordFieldType.TIMESTAMP); + final String timeFormat = getFormat(format, RecordFieldType.TIME); + return DataTypeUtils.toString(value, dateFormat, timeFormat, timestampFormat); + } + + @Override + public Long getAsLong(final String fieldName) { + return DataTypeUtils.toLong(getValue(fieldName), fieldName); + } + + @Override + public Integer getAsInt(final String fieldName) { + return DataTypeUtils.toInteger(getValue(fieldName), fieldName); + } + + @Override + public Double getAsDouble(final String fieldName) { + return DataTypeUtils.toDouble(getValue(fieldName), fieldName); + } + + @Override + public Float getAsFloat(final String fieldName) { + return DataTypeUtils.toFloat(getValue(fieldName), fieldName); + } + + @Override + public Record getAsRecord(String fieldName, final RecordSchema schema) { + return DataTypeUtils.toRecord(getValue(fieldName), schema, fieldName); + } + + @Override + public Boolean getAsBoolean(final String fieldName) { + return DataTypeUtils.toBoolean(getValue(fieldName), fieldName); + } + + @Override + public Date getAsDate(final String fieldName, final String format) { + return DataTypeUtils.toDate(getValue(fieldName), format, fieldName); + } + + @Override + public Object[] getAsArray(final String fieldName) { + return DataTypeUtils.toArray(getValue(fieldName), fieldName); + } + + + @Override + public int hashCode() { + return 31 + 41 * values.hashCode() + 7 * schema.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof MapRecord)) { + return false; + } + final MapRecord other = (MapRecord) obj; + return schema.equals(other.schema) && values.equals(other.values); + } + + @Override + public String toString() { + return "MapRecord[values=" + values + "]"; + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java new file mode 100644 index 0000000..a186611 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.io.IOException; + +public class PushBackRecordSet implements RecordSet { + private final RecordSet original; + private Record pushback; + + public PushBackRecordSet(final RecordSet original) { + this.original = original; + } + + @Override + public RecordSchema getSchema() throws IOException { + return original.getSchema(); + } + + @Override + public Record next() throws IOException { + if (pushback != null) { + final Record record = pushback; + pushback = null; + return record; + } + + return original.next(); + } + + public void pushback(final Record record) { + if (pushback != null) { + throw new IllegalStateException("RecordSet already has a Record pushed back. Cannot push back more than one record at a time."); + } + + this.pushback = record; + } + + public boolean isAnotherRecord() throws IOException { + if (pushback != null) { + return true; + } + + final Record nextRecord = next(); + if (nextRecord == null) { + return false; + } + + pushback(nextRecord); + return true; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java new file mode 100644 index 0000000..5e5e7ba --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.util.Date; + +public interface Record { + + RecordSchema getSchema(); + + /** + * <p> + * Returns a view of the the values of the fields in this Record. + * </p> + * + * <b>NOTE:</b> The array that is returned may be an underlying array that is backing + * the contents of the Record. As such, modifying the array in any way may result in + * modifying the record. + * + * @return a view of the values of the fields in this Record + */ + Object[] getValues(); + + Object getValue(String fieldName); + + Object getValue(RecordField field); + + String getAsString(String fieldName); + + String getAsString(String fieldName, String format); + + String getAsString(RecordField field, String format); + + Long getAsLong(String fieldName); + + Integer getAsInt(String fieldName); + + Double getAsDouble(String fieldName); + + Float getAsFloat(String fieldName); + + Record getAsRecord(String fieldName, RecordSchema schema); + + Boolean getAsBoolean(String fieldName); + + Date getAsDate(String fieldName, String format); + + Object[] getAsArray(String fieldName); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java new file mode 100644 index 0000000..dc68b01 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +public class RecordField { + private final String fieldName; + private final DataType dataType; + private final Set<String> aliases; + private final Object defaultValue; + + public RecordField(final String fieldName, final DataType dataType) { + this(fieldName, dataType, null, Collections.emptySet()); + } + + public RecordField(final String fieldName, final DataType dataType, final Object defaultValue) { + this(fieldName, dataType, defaultValue, Collections.emptySet()); + } + + public RecordField(final String fieldName, final DataType dataType, final Set<String> aliases) { + this(fieldName, dataType, null, aliases); + } + + public RecordField(final String fieldName, final DataType dataType, final Object defaultValue, final Set<String> aliases) { + if (defaultValue != null && !DataTypeUtils.isCompatibleDataType(defaultValue, dataType)) { + throw new IllegalArgumentException("Cannot set the default value for field [" + fieldName + "] to [" + defaultValue + + "] because that is not a valid value for Data Type [" + dataType + "]"); + } + + this.fieldName = Objects.requireNonNull(fieldName); + this.dataType = Objects.requireNonNull(dataType); + this.aliases = Collections.unmodifiableSet(Objects.requireNonNull(aliases)); + this.defaultValue = defaultValue; + } + + public String getFieldName() { + return fieldName; + } + + public Set<String> getAliases() { + return aliases; + } + + public DataType getDataType() { + return dataType; + } + + public Object getDefaultValue() { + return defaultValue; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + dataType.hashCode(); + result = prime * result + fieldName.hashCode(); + result = prime * result + aliases.hashCode(); + result = prime * result + ((defaultValue == null) ? 0 : defaultValue.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + RecordField other = (RecordField) obj; + return dataType.equals(other.getDataType()) && fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && Objects.equals(defaultValue, other.defaultValue); + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java new file mode 100644 index 0000000..2e6b5d7 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.MapDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public enum RecordFieldType { + /** + * A String field type. Fields of this type use a {@code java.lang.String} value. + */ + STRING("string"), + + /** + * A boolean field type. Fields of this type use a {@code boolean} value. + */ + BOOLEAN("boolean"), + + /** + * A byte field type. Fields of this type use a {@code byte} value. + */ + BYTE("byte"), + + /** + * A char field type. Fields of this type use a {@code char} value. + */ + CHAR("char"), + + /** + * A short field type. Fields of this type use a {@code short} value. + */ + SHORT("short"), + + /** + * An int field type. Fields of this type use an {@code int} value. + */ + INT("int"), + + /** + * A bigint field type. Fields of this type use a {@code java.math.BigInteger} value. + */ + BIGINT("bigint"), + + /** + * A long field type. Fields of this type use a {@code long} value. + */ + LONG("long"), + + /** + * A float field type. Fields of this type use a {@code float} value. + */ + FLOAT("float"), + + /** + * A double field type. Fields of this type use a {@code double} value. + */ + DOUBLE("double"), + + /** + * A date field type. Fields of this type use a {@code java.sql.Date} value. + */ + DATE("date", "yyyy-MM-dd"), + + /** + * A time field type. Fields of this type use a {@code java.sql.Time} value. + */ + TIME("time", "HH:mm:ss"), + + /** + * A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value. + */ + TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"), + + /** + * <p> + * A record field type. Fields of this type use a {@code org.apache.nifi.serialization.record.Record} value. A Record DataType should be + * created by providing the {@link RecordSchema} for the record: + * </p> + * + * <code> + * final DataType recordType = RecordFieldType.RECORD.getRecordDataType(recordSchema); + * </code> + * + * <p> + * A field of type RECORD should always have a {@link RecordDataType}, so the following idiom is acceptable for use: + * </p> + * + * <code> + * <pre> + * final DataType dataType = ...; + * if (dataType.getFieldType() == RecordFieldType.RECORD) { + * final RecordDataType recordDataType = (RecordDataType) dataType; + * final RecordSchema childSchema = recordDataType.getChildSchema(); + * ... + * } + * </pre> + * </code> + */ + RECORD("record", null, new RecordDataType(null)), + + /** + * <p> + * A choice field type. A field of type choice can be one of any number of different types, which are defined by the DataType that is used. + * For example, if a field should allow either a Long or an Integer, this can be accomplished by using: + * </p> + * + * <code> + * final DataType choiceType = RecordFieldType.CHOICE.getChoiceDataType( RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType() ); + * </code> + * + * <p> + * A field of type CHOICE should always have a {@link ChoiceDataType}, so the following idiom is acceptable for use: + * </p> + * + * <code> + * <pre> + * final DataType dataType = ...; + * if (dataType.getFieldType() == RecordFieldType.CHOICE) { + * final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + * final List<DataType> allowableTypes = choiceDataType.getPossibleSubTypes(); + * ... + * } + * </pre> + * </code> + */ + CHOICE("choice", null, new ChoiceDataType(Collections.emptyList())), + + /** + * <p> + * An array field type. Fields of this type use a {@code Object[]} value. Note that we are explicitly indicating that + * Object[] should be used here and not primitive array types. For instance, setting a value of {@code int[]} is not allowed. The DataType for + * this field should be created using the {@link #getArrayDataType(DataType)} method: + * </p> + * + * <code> + * final DataType arrayType = RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.INT.getDataType() ); + * </code> + * + * <p> + * A field of type ARRAY should always have an {@link ArrayDataType}, so the following idiom is acceptable for use: + * </p> + * + * <code> + * <pre> + * final DataType dataType = ...; + * if (dataType.getFieldType() == RecordFieldType.ARRAY) { + * final ArrayDataType arrayDataType = (ArrayDataType) dataType; + * final DataType elementType = arrayDataType.getElementType(); + * ... + * } + * </pre> + * </code> + */ + ARRAY("array", null, new ArrayDataType(null)), + + /** + * <p> + * A record field type. Fields of this type use a {@code Map<String, Object>} value. A Map DataType should be + * created by providing the {@link DataType} for the values: + * </p> + * + * <code> + * final DataType recordType = RecordFieldType.MAP.getRecordDataType( RecordFieldType.STRING.getDataType() ); + * </code> + * + * <p> + * A field of type MAP should always have a {@link MapDataType}, so the following idiom is acceptable for use: + * </p> + * + * <code> + * <pre> + * final DataType dataType = ...; + * if (dataType.getFieldType() == RecordFieldType.MAP) { + * final MapDataType mapDataType = (MapDataType) dataType; + * final DataType valueType = mapDataType.getValueType(); + * ... + * } + * </pre> + * </code> + */ + MAP("map", null, new MapDataType(null)); + + + private static final Map<String, RecordFieldType> SIMPLE_NAME_MAP = new HashMap<String, RecordFieldType>(); + + static { + for (RecordFieldType value : values()) { + SIMPLE_NAME_MAP.put(value.simpleName, value); + } + } + + private final String simpleName; + private final String defaultFormat; + private final DataType defaultDataType; + + private RecordFieldType(final String simpleName) { + this(simpleName, null); + } + + private RecordFieldType(final String simpleName, final String defaultFormat) { + this.simpleName = simpleName; + this.defaultFormat = defaultFormat; + this.defaultDataType = new DataType(this, defaultFormat); + } + + private RecordFieldType(final String simpleName, final String defaultFormat, final DataType defaultDataType) { + this.simpleName = simpleName; + this.defaultFormat = defaultFormat; + this.defaultDataType = defaultDataType; + } + + public String getDefaultFormat() { + return defaultFormat; + } + + /** + * @return the DataType with the default format + */ + public DataType getDataType() { + return defaultDataType; + } + + public DataType getDataType(final String format) { + return new DataType(this, format); + } + + /** + * Returns a Data Type that represents a "RECORD" or "ARRAY" type with the given schema. + * + * @param childSchema the Schema for the Record or Array + * @return a DataType that represents a Record or Array with the given schema, or <code>null</code> if this RecordFieldType + * is not the RECORD or ARRAY type. + */ + public DataType getRecordDataType(final RecordSchema childSchema) { + if (this != RECORD) { + return null; + } + + return new RecordDataType(childSchema); + } + + /** + * Returns a Data Type that represents an "ARRAY" type with the given element type. + * + * @param elementType the type of the arrays in the element + * @return a DataType that represents an Array with the given element type, or <code>null</code> if this RecordFieldType + * is not the ARRAY type. + */ + public DataType getArrayDataType(final DataType elementType) { + if (this != ARRAY) { + return null; + } + + return new ArrayDataType(elementType); + } + + + /** + * Returns a Data Type that represents a "CHOICE" of multiple possible types. This method is + * only applicable for a RecordFieldType of {@link #CHOICE}. + * + * @param possibleChildTypes the possible types that are allowable + * @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType + * is not the CHOICE type. + */ + public DataType getChoiceDataType(final List<DataType> possibleChildTypes) { + if (this != CHOICE) { + return null; + } + + return new ChoiceDataType(possibleChildTypes); + } + + /** + * Returns a Data Type that represents a "CHOICE" of multiple possible types. This method is + * only applicable for a RecordFieldType of {@link #CHOICE}. + * + * @param possibleChildTypes the possible types that are allowable + * @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType + * is not the CHOICE type. + */ + public DataType getChoiceDataType(final DataType... possibleChildTypes) { + if (this != CHOICE) { + return null; + } + + final List<DataType> list = new ArrayList<>(possibleChildTypes.length); + for (final DataType type : possibleChildTypes) { + list.add(type); + } + + return new ChoiceDataType(list); + } + + /** + * Returns a Data Type that represents a "MAP" type with the given value type. + * + * @param valueDataType the type of the values in the map + * @return a DataType that represents a Map with the given value type, or <code>null</code> if this RecordFieldType + * is not the MAP type. + */ + public DataType getMapDataType(final DataType valueDataType) { + if (this != MAP) { + return null; + } + + return new MapDataType(valueDataType); + } + + + public static RecordFieldType of(final String typeString) { + return SIMPLE_NAME_MAP.get(typeString); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java new file mode 100644 index 0000000..367f2b0 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.util.List; +import java.util.Optional; + +public interface RecordSchema { + /** + * @return the list of fields that are present in the schema + */ + List<RecordField> getFields(); + + /** + * @return the number of fields in the schema + */ + int getFieldCount(); + + /** + * @param index the 0-based index of which field to return + * @return the index'th field + * + * @throws IndexOutOfBoundsException if the index is < 0 or >= the number of fields (determined by {@link #getFieldCount()}). + */ + RecordField getField(int index); + + /** + * @return the data types of the fields + */ + List<DataType> getDataTypes(); + + /** + * @return the names of the fields + */ + List<String> getFieldNames(); + + /** + * @param fieldName the name of the field whose type is desired + * @return the RecordFieldType associated with the field that has the given name, or + * <code>null</code> if the schema does not contain a field with the given name + */ + Optional<DataType> getDataType(String fieldName); + + /** + * @return the textual representation of the schema, if one is available + */ + Optional<String> getSchemaText(); + + /** + * @return the format of the schema text, if schema text is present + */ + Optional<String> getSchemaFormat(); + + /** + * @param fieldName the name of the field + * @return an Optional RecordField for the field with the given name + */ + Optional<RecordField> getField(String fieldName); + + /** + * @return the SchemaIdentifier, which provides various attributes for identifying a schema + */ + SchemaIdentifier getIdentifier(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java new file mode 100644 index 0000000..9e67346 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.io.IOException; + +public interface RecordSet { + + /** + * @return the {@link RecordSchema} that applies to the records in this RecordSet + */ + RecordSchema getSchema() throws IOException; + + /** + * @return the next {@link Record} in the set or <code>null</code> if there are no more records + */ + Record next() throws IOException; + + /** + * Returns a new Record Set that will return no more than {@code maxRecords} records from this + * RecordSet. Any Records that are pulled from this newly created RecordSet will also advance + * the cursor in this Record Set and vice versa. + * + * @param maxRecords the maximum number of records to return from the new RecordSet + * @return a view of this RecordSet that limits the number of records returned + */ + default RecordSet limit(final int maxRecords) { + if (maxRecords < 0) { + throw new IllegalArgumentException("Cannot limit number of records to " + maxRecords + ". Limit must be a non-negative integer"); + } + + final RecordSet original = this; + return new RecordSet() { + private int count = 0; + + @Override + public RecordSchema getSchema() throws IOException { + return original.getSchema(); + } + + @Override + public Record next() throws IOException { + if (count >= maxRecords) { + return null; + } + + final Record record = original.next(); + if (record != null) { + count++; + } + + return record; + } + }; + } + + public static RecordSet of(final RecordSchema schema, final Record... records) { + return new RecordSet() { + private int index = 0; + + @Override + public RecordSchema getSchema() { + return schema; + } + + @Override + public Record next() { + if (index >= records.length) { + return null; + } + + return records[index++]; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java new file mode 100644 index 0000000..b6daab7 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.io.Closeable; +import java.io.IOException; +import java.math.BigInteger; +import java.sql.Array; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResultSetRecordSet implements RecordSet, Closeable { + private static final Logger logger = LoggerFactory.getLogger(ResultSetRecordSet.class); + private final ResultSet rs; + private final RecordSchema schema; + private final Set<String> rsColumnNames; + private boolean moreRows; + + public ResultSetRecordSet(final ResultSet rs) throws SQLException { + this.rs = rs; + moreRows = rs.next(); + this.schema = createSchema(rs); + + rsColumnNames = new HashSet<>(); + final ResultSetMetaData metadata = rs.getMetaData(); + for (int i = 0; i < metadata.getColumnCount(); i++) { + rsColumnNames.add(metadata.getColumnLabel(i + 1)); + } + } + + @Override + public RecordSchema getSchema() { + return schema; + } + + @Override + public Record next() throws IOException { + try { + if (moreRows) { + final Record record = createRecord(rs); + moreRows = rs.next(); + return record; + } else { + return null; + } + } catch (final SQLException e) { + throw new IOException("Could not obtain next record from ResultSet", e); + } + } + + @Override + public void close() { + try { + rs.close(); + } catch (final SQLException e) { + logger.error("Failed to close ResultSet", e); + } + } + + private Record createRecord(final ResultSet rs) throws SQLException { + final Map<String, Object> values = new HashMap<>(schema.getFieldCount()); + + for (final RecordField field : schema.getFields()) { + final String fieldName = field.getFieldName(); + + final Object value; + if (rsColumnNames.contains(fieldName)) { + value = normalizeValue(rs.getObject(fieldName)); + } else { + value = null; + } + + values.put(fieldName, value); + } + + return new MapRecord(schema, values); + } + + @SuppressWarnings("rawtypes") + private Object normalizeValue(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof List) { + return ((List) value).toArray(); + } + + return value; + } + + private static RecordSchema createSchema(final ResultSet rs) throws SQLException { + final ResultSetMetaData metadata = rs.getMetaData(); + final int numCols = metadata.getColumnCount(); + final List<RecordField> fields = new ArrayList<>(numCols); + + for (int i = 0; i < numCols; i++) { + final int column = i + 1; + final int sqlType = metadata.getColumnType(column); + + final DataType dataType = getDataType(sqlType, rs, column); + final String fieldName = metadata.getColumnLabel(column); + final RecordField field = new RecordField(fieldName, dataType); + fields.add(field); + } + + return new SimpleRecordSchema(fields); + } + + private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException { + switch (sqlType) { + case Types.ARRAY: + // The JDBC API does not allow us to know what the base type of an array is through the metadata. + // As a result, we have to obtain the actual Array for this record. Once we have this, we can determine + // the base type. However, if the base type is, itself, an array, we will simply return a base type of + // String because otherwise, we need the ResultSet for the array itself, and many JDBC Drivers do not + // support calling Array.getResultSet() and will throw an Exception if that is not supported. + if (rs.isAfterLast()) { + return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()); + } + + final Array array = rs.getArray(columnIndex); + if (array == null) { + return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()); + } + + final DataType baseType = getArrayBaseType(array); + return RecordFieldType.ARRAY.getArrayDataType(baseType); + case Types.BINARY: + case Types.LONGVARBINARY: + case Types.VARBINARY: + return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); + case Types.OTHER: + // If we have no records to inspect, we can't really know its schema so we simply use the default data type. + if (rs.isAfterLast()) { + return RecordFieldType.RECORD.getDataType(); + } + + final Object obj = rs.getObject(columnIndex); + if (obj == null || !(obj instanceof Record)) { + return RecordFieldType.RECORD.getDataType(); + } + + final Record record = (Record) obj; + final RecordSchema recordSchema = record.getSchema(); + return RecordFieldType.RECORD.getRecordDataType(recordSchema); + default: + return getFieldType(sqlType).getDataType(); + } + } + + private static DataType getArrayBaseType(final Array array) throws SQLException { + final Object arrayValue = array.getArray(); + if (arrayValue == null) { + return RecordFieldType.STRING.getDataType(); + } + + if (arrayValue instanceof byte[]) { + return RecordFieldType.BYTE.getDataType(); + } + if (arrayValue instanceof int[]) { + return RecordFieldType.INT.getDataType(); + } + if (arrayValue instanceof long[]) { + return RecordFieldType.LONG.getDataType(); + } + if (arrayValue instanceof boolean[]) { + return RecordFieldType.BOOLEAN.getDataType(); + } + if (arrayValue instanceof short[]) { + return RecordFieldType.SHORT.getDataType(); + } + if (arrayValue instanceof byte[]) { + return RecordFieldType.BYTE.getDataType(); + } + if (arrayValue instanceof float[]) { + return RecordFieldType.FLOAT.getDataType(); + } + if (arrayValue instanceof double[]) { + return RecordFieldType.DOUBLE.getDataType(); + } + if (arrayValue instanceof char[]) { + return RecordFieldType.CHAR.getDataType(); + } + if (arrayValue instanceof Object[]) { + final Object[] values = (Object[]) arrayValue; + if (values.length == 0) { + return RecordFieldType.STRING.getDataType(); + } + + Object valueToLookAt = null; + for (int i = 0; i < values.length; i++) { + valueToLookAt = values[i]; + if (valueToLookAt != null) { + break; + } + } + if (valueToLookAt == null) { + return RecordFieldType.STRING.getDataType(); + } + + if (valueToLookAt instanceof String) { + return RecordFieldType.STRING.getDataType(); + } + if (valueToLookAt instanceof Long) { + return RecordFieldType.LONG.getDataType(); + } + if (valueToLookAt instanceof Integer) { + return RecordFieldType.INT.getDataType(); + } + if (valueToLookAt instanceof Short) { + return RecordFieldType.SHORT.getDataType(); + } + if (valueToLookAt instanceof Byte) { + return RecordFieldType.BYTE.getDataType(); + } + if (valueToLookAt instanceof Float) { + return RecordFieldType.FLOAT.getDataType(); + } + if (valueToLookAt instanceof Double) { + return RecordFieldType.DOUBLE.getDataType(); + } + if (valueToLookAt instanceof Boolean) { + return RecordFieldType.BOOLEAN.getDataType(); + } + if (valueToLookAt instanceof Character) { + return RecordFieldType.CHAR.getDataType(); + } + if (valueToLookAt instanceof BigInteger) { + return RecordFieldType.BIGINT.getDataType(); + } + if (valueToLookAt instanceof Integer) { + return RecordFieldType.INT.getDataType(); + } + if (valueToLookAt instanceof java.sql.Time) { + return RecordFieldType.TIME.getDataType(); + } + if (valueToLookAt instanceof java.sql.Date) { + return RecordFieldType.DATE.getDataType(); + } + if (valueToLookAt instanceof java.sql.Timestamp) { + return RecordFieldType.TIMESTAMP.getDataType(); + } + if (valueToLookAt instanceof Record) { + return RecordFieldType.RECORD.getDataType(); + } + } + + return RecordFieldType.STRING.getDataType(); + } + + + private static RecordFieldType getFieldType(final int sqlType) { + switch (sqlType) { + case Types.BIGINT: + case Types.ROWID: + return RecordFieldType.LONG; + case Types.BIT: + case Types.BOOLEAN: + return RecordFieldType.BOOLEAN; + case Types.CHAR: + return RecordFieldType.CHAR; + case Types.DATE: + return RecordFieldType.DATE; + case Types.DECIMAL: + case Types.DOUBLE: + case Types.NUMERIC: + case Types.REAL: + return RecordFieldType.DOUBLE; + case Types.FLOAT: + return RecordFieldType.FLOAT; + case Types.INTEGER: + return RecordFieldType.INT; + case Types.SMALLINT: + return RecordFieldType.SHORT; + case Types.TINYINT: + return RecordFieldType.BYTE; + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + case Types.NCHAR: + case Types.NULL: + case Types.NVARCHAR: + case Types.VARCHAR: + return RecordFieldType.STRING; + case Types.OTHER: + case Types.JAVA_OBJECT: + return RecordFieldType.RECORD; + case Types.TIME: + case Types.TIME_WITH_TIMEZONE: + return RecordFieldType.TIME; + case Types.TIMESTAMP: + case Types.TIMESTAMP_WITH_TIMEZONE: + return RecordFieldType.TIMESTAMP; + } + + return RecordFieldType.STRING; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java new file mode 100644 index 0000000..d7f5664 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +public interface SchemaIdentifier { + + /** + * @return the name of the schema, if one has been defined. + */ + Optional<String> getName(); + + /** + * @return the identifier of the schema, if one has been defined. + */ + OptionalLong getIdentifier(); + + /** + * @return the version of the schema, if one has been defined. + */ + OptionalInt getVersion(); + + + public static SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null); + + public static SchemaIdentifier ofName(final String name) { + return new StandardSchemaIdentifier(name, null, null); + } + + public static SchemaIdentifier of(final String name, final long identifier, final int version) { + return new StandardSchemaIdentifier(name, identifier, version); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java new file mode 100644 index 0000000..86db284 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +public class StandardSchemaIdentifier implements SchemaIdentifier { + private final Optional<String> name; + private final OptionalLong identifier; + private final OptionalInt version; + + StandardSchemaIdentifier(final String name, final Long identifier, final Integer version) { + this.name = Optional.ofNullable(name); + this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);; + this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);; + } + + @Override + public Optional<String> getName() { + return name; + } + + @Override + public OptionalLong getIdentifier() { + return identifier; + } + + @Override + public OptionalInt getVersion() { + return version; + } + + @Override + public int hashCode() { + return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof SchemaIdentifier)) { + return false; + } + final SchemaIdentifier other = (SchemaIdentifier) obj; + return getName().equals(other.getName()) && getIdentifier().equals(other.getIdentifier()) && getVersion().equals(other.getVersion()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java new file mode 100644 index 0000000..af5f909 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +public class TypeMismatchException extends RuntimeException { + public TypeMismatchException(String message) { + super(message); + } + + public TypeMismatchException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java new file mode 100644 index 0000000..46dc447 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.type; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordFieldType; + +import java.util.Objects; + +public class ArrayDataType extends DataType { + private final DataType elementType; + + public ArrayDataType(final DataType elementType) { + super(RecordFieldType.ARRAY, null); + this.elementType = elementType; + } + + public DataType getElementType() { + return elementType; + } + + @Override + public RecordFieldType getFieldType() { + return RecordFieldType.ARRAY; + } + + @Override + public int hashCode() { + return 31 + 41 * getFieldType().hashCode() + 41 * (elementType == null ? 0 : elementType.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof ArrayDataType)) { + return false; + } + + final ArrayDataType other = (ArrayDataType) obj; + return getFieldType().equals(other.getFieldType()) && Objects.equals(elementType, other.elementType); + } + + @Override + public String toString() { + return "ARRAY[" + elementType + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java new file mode 100644 index 0000000..9fcdf73 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.type; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordFieldType; + +import java.util.List; +import java.util.Objects; + +public class ChoiceDataType extends DataType { + private final List<DataType> possibleSubTypes; + + public ChoiceDataType(final List<DataType> possibleSubTypes) { + super(RecordFieldType.CHOICE, null); + this.possibleSubTypes = Objects.requireNonNull(possibleSubTypes); + } + + public List<DataType> getPossibleSubTypes() { + return possibleSubTypes; + } + + @Override + public RecordFieldType getFieldType() { + return RecordFieldType.CHOICE; + } + + @Override + public int hashCode() { + return 31 + 41 * getFieldType().hashCode() + 41 * (possibleSubTypes == null ? 0 : possibleSubTypes.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof ChoiceDataType)) { + return false; + } + + final ChoiceDataType other = (ChoiceDataType) obj; + return getFieldType().equals(other.getFieldType()) && Objects.equals(possibleSubTypes, other.possibleSubTypes); + } + + @Override + public String toString() { + return "CHOICE" + possibleSubTypes; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java new file mode 100644 index 0000000..5ed1c39 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.type; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordFieldType; + +import java.util.Objects; + +public class MapDataType extends DataType { + private final DataType valueType; + + public MapDataType(final DataType elementType) { + super(RecordFieldType.MAP, null); + this.valueType = elementType; + } + + public DataType getValueType() { + return valueType; + } + + @Override + public RecordFieldType getFieldType() { + return RecordFieldType.MAP; + } + + @Override + public int hashCode() { + return 31 + 41 * getFieldType().hashCode() + 41 * (valueType == null ? 0 : valueType.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof MapDataType)) { + return false; + } + + final MapDataType other = (MapDataType) obj; + return getValueType().equals(other.getValueType()) && Objects.equals(valueType, other.valueType); + } + + @Override + public String toString() { + return "MAP[" + valueType + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java new file mode 100644 index 0000000..fc6993f --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.type; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.util.Objects; + +public class RecordDataType extends DataType { + private final RecordSchema childSchema; + + public RecordDataType(final RecordSchema childSchema) { + super(RecordFieldType.RECORD, null); + this.childSchema = childSchema; + } + + @Override + public RecordFieldType getFieldType() { + return RecordFieldType.RECORD; + } + + public RecordSchema getChildSchema() { + return childSchema; + } + + @Override + public int hashCode() { + return 31 + 41 * getFieldType().hashCode() + 41 * (childSchema == null ? 0 : childSchema.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof RecordDataType)) { + return false; + } + + final RecordDataType other = (RecordDataType) obj; + return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema); + } + + @Override + public String toString() { + return RecordFieldType.RECORD.toString(); + } +}
