http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java index 41469ba..8e1c7ed 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java @@ -17,31 +17,22 @@ package org.apache.nifi.processors.standard; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.standard.util.record.MockRecordParser; +import org.apache.nifi.processors.standard.util.record.MockRecordWriter; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.RowRecordReaderFactory; -import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; -import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -247,58 +238,14 @@ public class TestQueryFlowFile { Assert.assertEquals(columnNames, colNames); - return WriteResult.of(0, Collections.emptyMap()); - } - - @Override - public String getMimeType() { - return "text/plain"; - } - - @Override - public WriteResult write(Record record, OutputStream out) throws IOException { - return null; - } - }; - } - - } - - private static class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { - private final String header; - - public MockRecordWriter(final String header) { - this.header = header; - } - - @Override - public RecordSetWriter createWriter(final ComponentLog logger) { - return new RecordSetWriter() { - @Override - public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException { - out.write(header.getBytes()); - out.write("\n".getBytes()); - - int recordCount = 0; - final int numCols = rs.getSchema().getFieldCount(); - Record record = null; + // Iterate over the rest of the records to ensure that we read the entire stream. If we don't + // do this, we won't consume all of the data and as a result we will not close the stream properly + Record record; while ((record = rs.next()) != null) { - recordCount++; - int i = 0; - for (final String fieldName : record.getSchema().getFieldNames()) { - final String val = record.getAsString(fieldName); - out.write("\"".getBytes()); - out.write(val.getBytes()); - out.write("\"".getBytes()); - - if (i++ < numCols - 1) { - out.write(",".getBytes()); - } - } - out.write("\n".getBytes()); + System.out.println(record); } - return WriteResult.of(recordCount, Collections.emptyMap()); + return WriteResult.of(0, Collections.emptyMap()); } @Override @@ -312,68 +259,7 @@ public class TestQueryFlowFile { } }; } - } - - private static class MockRecordParser extends AbstractControllerService implements RowRecordReaderFactory { - private final List<Object[]> records = new ArrayList<>(); - private final List<RecordField> fields = new ArrayList<>(); - private final int failAfterN; - public MockRecordParser() { - this(-1); - } - - public MockRecordParser(final int failAfterN) { - this.failAfterN = failAfterN; - } - - - public void addSchemaField(final String fieldName, final RecordFieldType type) { - fields.add(new RecordField(fieldName, type.getDataType())); - } - - public void addRecord(Object... values) { - records.add(values); - } - - @Override - public RecordReader createRecordReader(InputStream in, ComponentLog logger) throws IOException { - final Iterator<Object[]> itr = records.iterator(); - - return new RecordReader() { - private int recordCount = 0; - - @Override - public void close() throws IOException { - } - - @Override - public Record nextRecord() throws IOException, MalformedRecordException { - if (failAfterN >= recordCount) { - throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read"); - } - recordCount++; - - if (!itr.hasNext()) { - return null; - } - - final Object[] values = itr.next(); - final Map<String, Object> valueMap = new HashMap<>(); - int i = 0; - for (final RecordField field : fields) { - final String fieldName = field.getFieldName(); - valueMap.put(fieldName, values[i++]); - } - - return new MapRecord(new SimpleRecordSchema(fields), valueMap); - } - - @Override - public RecordSchema getSchema() { - return new SimpleRecordSchema(fields); - } - }; - } } + }
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java new file mode 100644 index 0000000..1a39b82 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.util.record; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +public class MockRecordParser extends AbstractControllerService implements RowRecordReaderFactory { + private final List<Object[]> records = new ArrayList<>(); + private final List<RecordField> fields = new ArrayList<>(); + private final int failAfterN; + + public MockRecordParser() { + this(-1); + } + + public MockRecordParser(final int failAfterN) { + this.failAfterN = failAfterN; + } + + + public void addSchemaField(final String fieldName, final RecordFieldType type) { + fields.add(new RecordField(fieldName, type.getDataType())); + } + + public void addRecord(Object... values) { + records.add(values); + } + + @Override + public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException { + final Iterator<Object[]> itr = records.iterator(); + + return new RecordReader() { + private int recordCount = 0; + + @Override + public void close() throws IOException { + } + + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + if (failAfterN >= recordCount) { + throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read"); + } + recordCount++; + + if (!itr.hasNext()) { + return null; + } + + final Object[] values = itr.next(); + final Map<String, Object> valueMap = new HashMap<>(); + int i = 0; + for (final RecordField field : fields) { + final String fieldName = field.getFieldName(); + valueMap.put(fieldName, values[i++]); + } + + return new MapRecord(new SimpleRecordSchema(fields), valueMap); + } + + @Override + public RecordSchema getSchema() { + return new SimpleRecordSchema(fields); + } + }; + } + + @Override + public RecordSchema getSchema(FlowFile flowFile) throws MalformedRecordException, IOException { + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java new file mode 100644 index 0000000..1cf2a28 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.util.record; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { + private final String header; + + public MockRecordWriter(final String header) { + this.header = header; + } + + @Override + public RecordSetWriter createWriter(final ComponentLog logger) { + return new RecordSetWriter() { + @Override + public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException { + out.write(header.getBytes()); + out.write("\n".getBytes()); + + int recordCount = 0; + final int numCols = rs.getSchema().getFieldCount(); + Record record = null; + while ((record = rs.next()) != null) { + recordCount++; + int i = 0; + for (final String fieldName : record.getSchema().getFieldNames()) { + final String val = record.getAsString(fieldName); + out.write("\"".getBytes()); + out.write(val.getBytes()); + out.write("\"".getBytes()); + + if (i++ < numCols - 1) { + out.write(",".getBytes()); + } + } + out.write("\n".getBytes()); + } + + return WriteResult.of(recordCount, Collections.emptyMap()); + } + + @Override + public String getMimeType() { + return "text/plain"; + } + + @Override + public WriteResult write(Record record, OutputStream out) throws IOException { + return null; + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml index d7d5605..78c0381 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml @@ -17,7 +17,7 @@ <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-services</artifactId> - <version>1.1.0-SNAPSHOT</version> + <version>1.2.0-SNAPSHOT</version> </parent> <artifactId>nifi-record-serialization-service-api</artifactId> http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java index a0cfc79..b728498 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java @@ -39,7 +39,6 @@ public interface RecordReader extends Closeable { /** * Returns the next record in the stream or <code>null</code> if no more records are available. * - * @param schema the schema to use in order to determine how to interprets the fields in a record * @return the next record in the stream or <code>null</code> if no more records are available. * * @throws IOException if unable to read from the underlying data http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java index eef8d82..aa298d9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java @@ -26,7 +26,7 @@ public interface RecordWriter { /** * Writes the given result set to the given output stream * - * @param recordSet the record set to serialize + * @param record the record set to serialize * @param out the OutputStream to write to * @return the results of writing the data * @throws IOException if unable to write to the given OutputStream http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java index 5ef4c7c..fbd8a21 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.io.InputStream; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.record.RecordSchema; /** * <p> @@ -29,5 +31,8 @@ import org.apache.nifi.logging.ComponentLog; * </p> */ public interface RowRecordReaderFactory extends ControllerService { - RecordReader createRecordReader(InputStream in, ComponentLog logger) throws MalformedRecordException, IOException; + + RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException; + + RecordSchema getSchema(FlowFile flowFile) throws MalformedRecordException, IOException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java index 0c187f1..b72c107 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java @@ -17,36 +17,15 @@ package org.apache.nifi.serialization.record; -import java.util.Collections; -import java.util.List; -import java.util.Optional; - public class DataType { private final RecordFieldType fieldType; private final String format; - private final RecordSchema childSchema; - private final List<DataType> childTypes; - - DataType(final RecordFieldType fieldType, final String format) { - this(fieldType, format, (RecordSchema) null); - } - - DataType(final RecordFieldType fieldType, final String format, final RecordSchema childSchema) { + protected DataType(final RecordFieldType fieldType, final String format) { this.fieldType = fieldType; this.format = format; - this.childSchema = childSchema; - this.childTypes = Collections.emptyList(); } - DataType(final RecordFieldType fieldType, final String format, final List<DataType> childTypes) { - this.fieldType = fieldType; - this.format = format; - this.childSchema = null; - this.childTypes = Collections.unmodifiableList(childTypes); - } - - public String getFormat() { return format; } @@ -55,14 +34,6 @@ public class DataType { return fieldType; } - public Optional<RecordSchema> getChildRecordSchema() { - return Optional.ofNullable(childSchema); - } - - public List<DataType> getPossibleTypes() { - return childTypes; - } - @Override public int hashCode() { return 31 + 41 * fieldType.hashCode() + 41 * (format == null ? 0 : format.hashCode()); http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java index f3f9024..0bbb534 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -17,16 +17,13 @@ package org.apache.nifi.serialization.record; -import java.sql.Time; -import java.sql.Timestamp; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.Date; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + public class MapRecord implements Record { private final RecordSchema schema; private final Map<String, Object> values; @@ -80,220 +77,52 @@ public class MapRecord implements Record { return null; } - if (value instanceof java.sql.Date) { - java.sql.Date date = (java.sql.Date) value; - final long time = date.getTime(); - return new SimpleDateFormat(getFormat(format, RecordFieldType.DATE)).format(new java.util.Date(time)); - } - if (value instanceof java.util.Date) { - return new SimpleDateFormat(getFormat(format, RecordFieldType.DATE)).format((java.util.Date) value); - } - if (value instanceof Timestamp) { - java.sql.Timestamp date = (java.sql.Timestamp) value; - final long time = date.getTime(); - return new SimpleDateFormat(getFormat(format, RecordFieldType.TIMESTAMP)).format(new java.util.Date(time)); - } - if (value instanceof Time) { - java.sql.Time date = (java.sql.Time) value; - final long time = date.getTime(); - return new SimpleDateFormat(getFormat(format, RecordFieldType.TIME)).format(new java.util.Date(time)); - } - - return value.toString(); + final String dateFormat = getFormat(format, RecordFieldType.DATE); + final String timestampFormat = getFormat(format, RecordFieldType.TIMESTAMP); + final String timeFormat = getFormat(format, RecordFieldType.TIME); + return DataTypeUtils.toString(value, dateFormat, timeFormat, timestampFormat); } @Override public Long getAsLong(final String fieldName) { - return convertToLong(getValue(fieldName), fieldName); - } - - private Long convertToLong(final Object value, final Object fieldDesc) { - if (value == null) { - return null; - } - - if (value instanceof Number) { - return ((Number) value).longValue(); - } - if (value instanceof String) { - return Long.parseLong((String) value); - } - if (value instanceof Date) { - return ((Date) value).getTime(); - } - - throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Long for field " + fieldDesc); + return DataTypeUtils.toLong(getValue(fieldName)); } @Override public Integer getAsInt(final String fieldName) { - return convertToInt(getValue(fieldName), fieldName); - } - - private Integer convertToInt(final Object value, final Object fieldDesc) { - if (value == null) { - return null; - } - - if (value instanceof Number) { - return ((Number) value).intValue(); - } - if (value instanceof String) { - return Integer.parseInt((String) value); - } - - throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Integer for field " + fieldDesc); + return DataTypeUtils.toInteger(getValue(fieldName)); } - @Override public Double getAsDouble(final String fieldName) { - return convertToDouble(getValue(fieldName), fieldName); - } - - private Double convertToDouble(final Object value, final Object fieldDesc) { - if (value == null) { - return null; - } - - if (value instanceof Number) { - return ((Number) value).doubleValue(); - } - if (value instanceof String) { - return Double.parseDouble((String) value); - } - - throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Double for field " + fieldDesc); + return DataTypeUtils.toDouble(getValue(fieldName)); } @Override public Float getAsFloat(final String fieldName) { - return convertToFloat(getValue(fieldName), fieldName); - } - - private Float convertToFloat(final Object value, final Object fieldDesc) { - if (value == null) { - return null; - } - - if (value instanceof Number) { - return ((Number) value).floatValue(); - } - if (value instanceof String) { - return Float.parseFloat((String) value); - } - - throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Float for field " + fieldDesc); + return DataTypeUtils.toFloat(getValue(fieldName)); } @Override - public Record getAsRecord(String fieldName) { - return convertToRecord(getValue(fieldName), fieldName); - } - - private Record convertToRecord(final Object value, final Object fieldDesc) { - if (value == null) { - return null; - } - - if (value instanceof Record) { - return (Record) value; - } - - throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Record for field " + fieldDesc); + public Record getAsRecord(String fieldName, final RecordSchema schema) { + return DataTypeUtils.toRecord(getValue(fieldName), schema); } - @Override public Boolean getAsBoolean(final String fieldName) { - return convertToBoolean(getValue(fieldName), fieldName); - } - - private Boolean convertToBoolean(final Object value, final Object fieldDesc) { - if (value == null) { - return null; - } - - if (value instanceof Boolean) { - return (Boolean) value; - } - if (value instanceof String) { - final String string = (String) value; - if (string.equalsIgnoreCase("true") || string.equalsIgnoreCase("t")) { - return Boolean.TRUE; - } - - if (string.equalsIgnoreCase("false") || string.equals("f")) { - return Boolean.FALSE; - } - - throw new TypeMismatchException("Cannot convert String value to Boolean for field " + fieldDesc + " because it is not a valid boolean value"); - } - - throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Boolean for field " + fieldDesc); - } - - @Override - public Date getAsDate(final String fieldName) { - final Optional<DataType> dataTypeOption = schema.getDataType(fieldName); - if (!dataTypeOption.isPresent()) { - return null; - } - - return convertToDate(getValue(fieldName), fieldName, dataTypeOption.get().getFormat()); + return DataTypeUtils.toBoolean(getValue(fieldName)); } @Override public Date getAsDate(final String fieldName, final String format) { - return convertToDate(getValue(fieldName), fieldName, format); - } - - private Date convertToDate(final Object value, final Object fieldDesc, final String format) { - if (value == null) { - return null; - } - - if (value instanceof Date) { - return (Date) value; - } - if (value instanceof Number) { - final Long time = ((Number) value).longValue(); - return new Date(time); - } - if (value instanceof java.sql.Date) { - return new Date(((java.sql.Date) value).getTime()); - } - if (value instanceof String) { - try { - return new SimpleDateFormat(getFormat(format, RecordFieldType.DATE)).parse((String) value); - } catch (final ParseException e) { - throw new TypeMismatchException("Cannot convert String value to date for field " + fieldDesc + " because it is not in the correct format of: " + format, e); - } - } - - throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Boolean for field " + fieldDesc); + return DataTypeUtils.toDate(getValue(fieldName), format); } @Override public Object[] getAsArray(final String fieldName) { - return convertToArray(getValue(fieldName)); + return DataTypeUtils.toArray(getValue(fieldName)); } - private Object[] convertToArray(final Object value) { - if (value == null) { - return null; - } - - if (value instanceof Object[]) { - return (Object[]) value; - } - - if (value instanceof List) { - return ((List<?>) value).toArray(); - } - - return new Object[] {value}; - } @Override public int hashCode() { http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java index ca85741..e1d52e9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java @@ -50,12 +50,10 @@ public interface Record { Float getAsFloat(String fieldName); - Record getAsRecord(String fieldName); + Record getAsRecord(String fieldName, RecordSchema schema); Boolean getAsBoolean(String fieldName); - Date getAsDate(String fieldName); - Date getAsDate(String fieldName, String format); Object[] getAsArray(String fieldName); http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java index 8ad212b..cc83a41 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java @@ -17,35 +17,171 @@ package org.apache.nifi.serialization.record; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; + public enum RecordFieldType { + /** + * A String field type. Fields of this type use a {@code java.lang.String} value. + */ STRING("string"), + + /** + * A boolean field type. Fields of this type use a {@code boolean} value. + */ BOOLEAN("boolean"), + + /** + * A byte field type. Fields of this type use a {@code byte} value. + */ BYTE("byte"), + + /** + * A char field type. Fields of this type use a {@code char} value. + */ CHAR("char"), + + /** + * A short field type. Fields of this type use a {@code short} value. + */ SHORT("short"), + + /** + * An int field type. Fields of this type use an {@code int} value. + */ INT("int"), + + /** + * A bigint field type. Fields of this type use a {@code java.math.BigInteger} value. + */ BIGINT("bigint"), + + /** + * A long field type. Fields of this type use a {@code long} value. + */ LONG("long"), + + /** + * A float field type. Fields of this type use a {@code float} value. + */ FLOAT("float"), + + /** + * A double field type. Fields of this type use a {@code double} value. + */ DOUBLE("double"), + + /** + * A date field type. Fields of this type use a {@code java.sql.Date} value. + */ DATE("date", "yyyy-MM-dd"), + + /** + * A time field type. Fields of this type use a {@code java.sql.Time} value. + */ TIME("time", "HH:mm:ss"), + + /** + * A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value. + */ TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"), - RECORD("record"), - CHOICE("choice"), - ARRAY("array"); + + /** + * <p> + * A record field type. Fields of this type use a {@code org.apache.nifi.serialization.record.Record} value. A Record DataType should be + * created by providing the {@link RecordSchema} for the record: + * </p> + * + * <code> + * final DataType recordType = RecordFieldType.RECORD.getRecordDataType(recordSchema); + * </code> + * + * <p> + * A field of type RECORD should always have a {@link RecordDataType}, so the following idiom is acceptable for use: + * </p> + * + * <code> + * <pre> + * final DataType dataType = ...; + * if (dataType.getFieldType() == RecordFieldType.RECORD) { + * final RecordDataType recordDataType = (RecordDataType) dataType; + * final RecordSchema childSchema = recordDataType.getChildSchema(); + * ... + * } + * </pre> + * </code> + */ + RECORD("record", null, new RecordDataType(null)), + + /** + * <p> + * A choice field type. A field of type choice can be one of any number of different types, which are defined by the DataType that is used. + * For example, if a field should allow either a Long or an Integer, this can be accomplished by using: + * </p> + * + * <code> + * final DataType choiceType = RecordFieldType.CHOICE.getChoiceDataType( RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType() ); + * </code> + * + * <p> + * A field of type CHOICE should always have a {@link ChoiceDataType}, so the following idiom is acceptable for use: + * </p> + * + * <code> + * <pre> + * final DataType dataType = ...; + * if (dataType.getFieldType() == RecordFieldType.CHOICE) { + * final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + * final List<DataType> allowableTypes = choiceDataType.getPossibleSubTypes(); + * ... + * } + * </pre> + * </code> + */ + CHOICE("choice", null, new ChoiceDataType(Collections.emptyList())), + + /** + * <p> + * An array field type. Records should be updated using an {@code Object[]} value for this field. Note that we are explicitly indicating that + * Object[] should be used here and not primitive array types. For instance, setting a value of {@code int[]} is not allowed. The DataType for + * this field should be created using the {@link #getArrayDataType(DataType)} method: + * </p> + * + * <code> + * final DataType arrayType = RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.INT.getDataType() ); + * </code> + * + * <p> + * A field of type ARRAY should always have an {@link ArrayDataType}, so the following idiom is acceptable for use: + * </p> + * + * <code> + * <pre> + * final DataType dataType = ...; + * if (dataType.getFieldType() == RecordFieldType.ARRAY) { + * final ArrayDataType arrayDataType = (ArrayDataType) dataType; + * final DataType elementType = arrayDataType.getElementType(); + * ... + * } + * </pre> + * </code> + */ + ARRAY("array", null, new ArrayDataType(null)); private static final Map<String, RecordFieldType> SIMPLE_NAME_MAP = new HashMap<String, RecordFieldType>(); static { - for (RecordFieldType value : values()) { - SIMPLE_NAME_MAP.put(value.simpleName, value); - } + for (RecordFieldType value : values()) { + SIMPLE_NAME_MAP.put(value.simpleName, value); + } } private final String simpleName; @@ -62,6 +198,12 @@ public enum RecordFieldType { this.defaultDataType = new DataType(this, defaultFormat); } + private RecordFieldType(final String simpleName, final String defaultFormat, final DataType defaultDataType) { + this.simpleName = simpleName; + this.defaultFormat = defaultFormat; + this.defaultDataType = defaultDataType; + } + public String getDefaultFormat() { return defaultFormat; } @@ -78,18 +220,50 @@ public enum RecordFieldType { } /** - * Returns a Data Type that represents a "RECORD" type with the given schema. + * Returns a Data Type that represents a "RECORD" or "ARRAY" type with the given schema. * - * @param childSchema the Schema for the Record - * @return a DataType that represents a Record with the given schema, or <code>null</code> if this RecordFieldType - * is not the RECORD type. + * @param childSchema the Schema for the Record or Array + * @return a DataType that represents a Record or Array with the given schema, or <code>null</code> if this RecordFieldType + * is not the RECORD or ARRAY type. */ - public DataType getDataType(final RecordSchema childSchema) { + public DataType getRecordDataType(final RecordSchema childSchema) { if (this != RECORD) { return null; } - return new DataType(this, getDefaultFormat(), childSchema); + return new RecordDataType(childSchema); + } + + /** + * Returns a Data Type that represents a "RECORD" or "ARRAY" type with the given schema. + * + * @param elementType the type of the arrays in the element + * @return a DataType that represents a Record or Array with the given schema, or <code>null</code> if this RecordFieldType + * is not the RECORD or ARRAY type. + */ + public DataType getArrayDataType(final DataType elementType) { + if (this != ARRAY) { + return null; + } + + return new ArrayDataType(elementType); + } + + + /** + * Returns a Data Type that represents a "CHOICE" of multiple possible types. This method is + * only applicable for a RecordFieldType of {@link #CHOICE}. + * + * @param possibleChildTypes the possible types that are allowable + * @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType + * is not the CHOICE type. + */ + public DataType getChoiceDataType(final List<DataType> possibleChildTypes) { + if (this != CHOICE) { + return null; + } + + return new ChoiceDataType(possibleChildTypes); } /** @@ -100,14 +274,20 @@ public enum RecordFieldType { * @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType * is not the CHOICE type. */ - public DataType getDataType(final List<DataType> possibleChildTypes) { + public DataType getChoiceDataType(final DataType... possibleChildTypes) { if (this != CHOICE) { return null; } - return new DataType(this, getDefaultFormat(), possibleChildTypes); + final List<DataType> list = new ArrayList<>(possibleChildTypes.length); + for (final DataType type : possibleChildTypes) { + list.add(type); + } + + return new ChoiceDataType(list); } + public static RecordFieldType of(final String typeString) { return SIMPLE_NAME_MAP.get(typeString); } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index e166918..be064ab 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -19,6 +19,8 @@ package org.apache.nifi.serialization.record; import java.io.Closeable; import java.io.IOException; +import java.math.BigInteger; +import java.sql.Array; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -39,9 +41,11 @@ public class ResultSetRecordSet implements RecordSet, Closeable { private final ResultSet rs; private final RecordSchema schema; private final Set<String> rsColumnNames; + private boolean moreRows; public ResultSetRecordSet(final ResultSet rs) throws SQLException { this.rs = rs; + moreRows = rs.next(); this.schema = createSchema(rs); rsColumnNames = new HashSet<>(); @@ -59,14 +63,16 @@ public class ResultSetRecordSet implements RecordSet, Closeable { @Override public Record next() throws IOException { try { - if (rs.next()) { - return createRecord(rs); + if (moreRows) { + final Record record = createRecord(rs); + moreRows = rs.next(); + return record; + } else { + return null; } } catch (final SQLException e) { throw new IOException("Could not obtain next record from ResultSet", e); } - - return null; } @Override @@ -86,7 +92,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { final Object value; if (rsColumnNames.contains(fieldName)) { - value = rs.getObject(field.getFieldName()); + value = normalizeValue(rs.getObject(fieldName)); } else { value = null; } @@ -97,6 +103,19 @@ public class ResultSetRecordSet implements RecordSet, Closeable { return new MapRecord(schema, values); } + @SuppressWarnings("rawtypes") + private Object normalizeValue(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof List) { + return ((List) value).toArray(); + } + + return value; + } + private static RecordSchema createSchema(final ResultSet rs) throws SQLException { final ResultSetMetaData metadata = rs.getMetaData(); final int numCols = metadata.getColumnCount(); @@ -106,26 +125,149 @@ public class ResultSetRecordSet implements RecordSet, Closeable { final int column = i + 1; final int sqlType = metadata.getColumnType(column); - final RecordFieldType fieldType = getFieldType(sqlType); + final DataType dataType = getDataType(sqlType, rs, column); final String fieldName = metadata.getColumnLabel(column); - final RecordField field = new RecordField(fieldName, fieldType.getDataType()); + final RecordField field = new RecordField(fieldName, dataType); fields.add(field); } return new SimpleRecordSchema(fields); } - private static RecordFieldType getFieldType(final int sqlType) { + private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException { switch (sqlType) { case Types.ARRAY: - return RecordFieldType.ARRAY; - case Types.BIGINT: - case Types.ROWID: - return RecordFieldType.LONG; + // The JDBC API does not allow us to know what the base type of an array is through the metadata. + // As a result, we have to obtain the actual Array for this record. Once we have this, we can determine + // the base type. However, if the base type is, itself, an array, we will simply return a base type of + // String because otherwise, we need the ResultSet for the array itself, and many JDBC Drivers do not + // support calling Array.getResultSet() and will throw an Exception if that is not supported. + if (rs.isAfterLast()) { + return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()); + } + + final Array array = rs.getArray(columnIndex); + if (array == null) { + return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()); + } + + final DataType baseType = getArrayBaseType(array); + return RecordFieldType.ARRAY.getArrayDataType(baseType); case Types.BINARY: case Types.LONGVARBINARY: case Types.VARBINARY: - return RecordFieldType.ARRAY; + return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); + default: + return getFieldType(sqlType).getDataType(); + } + } + + private static DataType getArrayBaseType(final Array array) throws SQLException { + final Object arrayValue = array.getArray(); + if (arrayValue == null) { + return RecordFieldType.STRING.getDataType(); + } + + if (arrayValue instanceof byte[]) { + return RecordFieldType.BYTE.getDataType(); + } + if (arrayValue instanceof int[]) { + return RecordFieldType.INT.getDataType(); + } + if (arrayValue instanceof long[]) { + return RecordFieldType.LONG.getDataType(); + } + if (arrayValue instanceof boolean[]) { + return RecordFieldType.BOOLEAN.getDataType(); + } + if (arrayValue instanceof short[]) { + return RecordFieldType.SHORT.getDataType(); + } + if (arrayValue instanceof byte[]) { + return RecordFieldType.BYTE.getDataType(); + } + if (arrayValue instanceof float[]) { + return RecordFieldType.FLOAT.getDataType(); + } + if (arrayValue instanceof double[]) { + return RecordFieldType.DOUBLE.getDataType(); + } + if (arrayValue instanceof char[]) { + return RecordFieldType.CHAR.getDataType(); + } + if (arrayValue instanceof Object[]) { + final Object[] values = (Object[]) arrayValue; + if (values.length == 0) { + return RecordFieldType.STRING.getDataType(); + } + + Object valueToLookAt = null; + for (int i = 0; i < values.length; i++) { + valueToLookAt = values[i]; + if (valueToLookAt != null) { + break; + } + } + if (valueToLookAt == null) { + return RecordFieldType.STRING.getDataType(); + } + + if (valueToLookAt instanceof String) { + return RecordFieldType.STRING.getDataType(); + } + if (valueToLookAt instanceof Long) { + return RecordFieldType.LONG.getDataType(); + } + if (valueToLookAt instanceof Integer) { + return RecordFieldType.INT.getDataType(); + } + if (valueToLookAt instanceof Short) { + return RecordFieldType.SHORT.getDataType(); + } + if (valueToLookAt instanceof Byte) { + return RecordFieldType.BYTE.getDataType(); + } + if (valueToLookAt instanceof Float) { + return RecordFieldType.FLOAT.getDataType(); + } + if (valueToLookAt instanceof Double) { + return RecordFieldType.DOUBLE.getDataType(); + } + if (valueToLookAt instanceof Boolean) { + return RecordFieldType.BOOLEAN.getDataType(); + } + if (valueToLookAt instanceof Character) { + return RecordFieldType.CHAR.getDataType(); + } + if (valueToLookAt instanceof BigInteger) { + return RecordFieldType.BIGINT.getDataType(); + } + if (valueToLookAt instanceof Integer) { + return RecordFieldType.INT.getDataType(); + } + if (valueToLookAt instanceof java.sql.Time) { + return RecordFieldType.TIME.getDataType(); + } + if (valueToLookAt instanceof java.sql.Date) { + return RecordFieldType.DATE.getDataType(); + } + if (valueToLookAt instanceof java.sql.Timestamp) { + return RecordFieldType.TIMESTAMP.getDataType(); + } + if (valueToLookAt instanceof Record) { + return RecordFieldType.RECORD.getDataType(); + } + } + + return RecordFieldType.STRING.getDataType(); + } + + + private static RecordFieldType getFieldType(final int sqlType) { + switch (sqlType) { + case Types.BIGINT: + case Types.ROWID: + return RecordFieldType.LONG; case Types.BIT: case Types.BOOLEAN: return RecordFieldType.BOOLEAN; http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java new file mode 100644 index 0000000..f507f23 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.type; + +import java.util.Objects; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordFieldType; + +public class ArrayDataType extends DataType { + private final DataType elementType; + + public ArrayDataType(final DataType elementType) { + super(RecordFieldType.ARRAY, null); + this.elementType = elementType; + } + + public DataType getElementType() { + return elementType; + } + + @Override + public RecordFieldType getFieldType() { + return RecordFieldType.ARRAY; + } + + @Override + public int hashCode() { + return 31 + 41 * getFieldType().hashCode() + 41 * (elementType == null ? 0 : elementType.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof RecordDataType)) { + return false; + } + + final ArrayDataType other = (ArrayDataType) obj; + return getFieldType().equals(other.getFieldType()) && Objects.equals(elementType, other.elementType); + } + + @Override + public String toString() { + return "ARRAY[" + elementType + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java new file mode 100644 index 0000000..b74cdcc --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.type; + +import java.util.List; +import java.util.Objects; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordFieldType; + +public class ChoiceDataType extends DataType { + private final List<DataType> possibleSubTypes; + + public ChoiceDataType(final List<DataType> possibleSubTypes) { + super(RecordFieldType.CHOICE, null); + this.possibleSubTypes = Objects.requireNonNull(possibleSubTypes); + } + + public List<DataType> getPossibleSubTypes() { + return possibleSubTypes; + } + + @Override + public RecordFieldType getFieldType() { + return RecordFieldType.CHOICE; + } + + @Override + public int hashCode() { + return 31 + 41 * getFieldType().hashCode() + 41 * (possibleSubTypes == null ? 0 : possibleSubTypes.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof RecordDataType)) { + return false; + } + + final ChoiceDataType other = (ChoiceDataType) obj; + return getFieldType().equals(other.getFieldType()) && Objects.equals(possibleSubTypes, other.possibleSubTypes); + } + + @Override + public String toString() { + return "CHOICE" + possibleSubTypes; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java new file mode 100644 index 0000000..f24d036 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.type; + +import java.util.Objects; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +public class RecordDataType extends DataType { + private final RecordSchema childSchema; + + public RecordDataType(final RecordSchema childSchema) { + super(RecordFieldType.RECORD, null); + this.childSchema = childSchema; + } + + @Override + public RecordFieldType getFieldType() { + return RecordFieldType.RECORD; + } + + public RecordSchema getChildSchema() { + return childSchema; + } + + @Override + public int hashCode() { + return 31 + 41 * getFieldType().hashCode() + 41 * (childSchema == null ? 0 : childSchema.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof RecordDataType)) { + return false; + } + + final RecordDataType other = (RecordDataType) obj; + return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java new file mode 100644 index 0000000..1cdefb8 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.util; + +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; +import java.util.function.Consumer; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; + +public class DataTypeUtils { + + private static final TimeZone gmt = TimeZone.getTimeZone("gmt"); + + public static Object convertType(final Object value, final DataType dataType) { + return convertType(value, dataType, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); + } + + public static Object convertType(final Object value, final DataType dataType, final String dateFormat, final String timeFormat, final String timestampFormat) { + switch (dataType.getFieldType()) { + case BIGINT: + return toBigInt(value); + case BOOLEAN: + return toBoolean(value); + case BYTE: + return toByte(value); + case CHAR: + return toCharacter(value); + case DATE: + return toDate(value, dateFormat); + case DOUBLE: + return toDouble(value); + case FLOAT: + return toFloat(value); + case INT: + return toInteger(value); + case LONG: + return toLong(value); + case SHORT: + return toShort(value); + case STRING: + return toString(value, dateFormat, timeFormat, timestampFormat); + case TIME: + return toTime(value, timeFormat); + case TIMESTAMP: + return toTimestamp(value, timestampFormat); + case ARRAY: + return toArray(value); + case RECORD: + final RecordDataType recordType = (RecordDataType) dataType; + final RecordSchema childSchema = recordType.getChildSchema(); + return toRecord(value, childSchema); + case CHOICE: { + if (value == null) { + return null; + } + + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + final DataType chosenDataType = chooseDataType(value, choiceDataType); + if (chosenDataType == null) { + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + + " to any of the following available Sub-Types for a Choice: " + choiceDataType.getPossibleSubTypes()); + } + + return convertType(value, chosenDataType); + } + } + + return null; + } + + + public static boolean isCompatibleDataType(final Object value, final DataType dataType) { + switch (dataType.getFieldType()) { + case ARRAY: + return isArrayTypeCompatible(value); + case BIGINT: + return isBigIntTypeCompatible(value); + case BOOLEAN: + return isBooleanTypeCompatible(value); + case BYTE: + return isByteTypeCompatible(value); + case CHAR: + return isCharacterTypeCompatible(value); + case DATE: + return isDateTypeCompatible(value, dataType.getFormat()); + case DOUBLE: + return isDoubleTypeCompatible(value); + case FLOAT: + return isFloatTypeCompatible(value); + case INT: + return isIntegerTypeCompatible(value); + case LONG: + return isLongTypeCompatible(value); + case RECORD: + return isRecordTypeCompatible(value); + case SHORT: + return isShortTypeCompatible(value); + case TIME: + return isTimeTypeCompatible(value, dataType.getFormat()); + case TIMESTAMP: + return isTimestampTypeCompatible(value, dataType.getFormat()); + case STRING: + return isStringTypeCompatible(value); + case CHOICE: { + final DataType chosenDataType = chooseDataType(value, (ChoiceDataType) dataType); + return chosenDataType != null; + } + } + + return false; + } + + public static DataType chooseDataType(final Object value, final ChoiceDataType choiceType) { + for (final DataType subType : choiceType.getPossibleSubTypes()) { + if (isCompatibleDataType(value, subType)) { + return subType; + } + } + + return null; + } + + public static Record toRecord(final Object value, final RecordSchema recordSchema) { + if (value == null) { + return null; + } + + if (value instanceof Record) { + return ((Record) value); + } + + if (value instanceof Map) { + if (recordSchema == null) { + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + + " to Record because the value is a Map but no Record Schema was provided"); + } + + final Map<?, ?> map = (Map<?, ?>) value; + final Map<String, Object> coercedValues = new HashMap<>(); + + for (final Map.Entry<?, ?> entry : map.entrySet()) { + final Object keyValue = entry.getKey(); + if (keyValue == null) { + continue; + } + + final String key = keyValue.toString(); + final Optional<DataType> desiredTypeOption = recordSchema.getDataType(key); + if (!desiredTypeOption.isPresent()) { + continue; + } + + final Object rawValue = entry.getValue(); + final Object coercedValue = convertType(rawValue, desiredTypeOption.get()); + coercedValues.put(key, coercedValue); + } + + return new MapRecord(recordSchema, coercedValues); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Record"); + } + + public static boolean isRecordTypeCompatible(final Object value) { + return value != null && value instanceof Record; + } + + public static Object[] toArray(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof Object[]) { + return (Object[]) value; + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array"); + } + + public static boolean isArrayTypeCompatible(final Object value) { + return value != null && value instanceof Object[]; + } + + public static String toString(final Object value, final String dateFormat, final String timeFormat, final String timestampFormat) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return (String) value; + } + + if (value instanceof java.sql.Date) { + return getDateFormat(dateFormat).format((java.util.Date) value); + } + if (value instanceof java.sql.Time) { + return getDateFormat(timeFormat).format((java.util.Date) value); + } + if (value instanceof java.sql.Timestamp) { + return getDateFormat(timestampFormat).format((java.util.Date) value); + } + if (value instanceof java.util.Date) { + return getDateFormat(timestampFormat).format((java.util.Date) value); + } + + return value.toString(); + } + + public static boolean isStringTypeCompatible(final Object value) { + return value != null && (value instanceof String || value instanceof java.util.Date); + } + + public static java.sql.Date toDate(final Object value, final String format) { + if (value == null) { + return null; + } + + if (value instanceof Date) { + return (Date) value; + } + + if (value instanceof Number) { + final long longValue = ((Number) value).longValue(); + return new Date(longValue); + } + + if (value instanceof String) { + try { + final java.util.Date utilDate = getDateFormat(format).parse((String) value); + return new Date(utilDate.getTime()); + } catch (final ParseException e) { + throw new IllegalTypeConversionException("Could not convert value [" + value + + "] of type java.lang.String to Date because the value is not in the expected date format: " + format); + } + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date"); + } + + public static boolean isDateTypeCompatible(final Object value, final String format) { + if (value == null) { + return false; + } + + if (value instanceof java.util.Date || value instanceof Number) { + return true; + } + + if (value instanceof String) { + try { + getDateFormat(format).parse((String) value); + return true; + } catch (final ParseException e) { + return false; + } + } + + return false; + } + + public static Time toTime(final Object value, final String format) { + if (value == null) { + return null; + } + + if (value instanceof Time) { + return (Time) value; + } + + if (value instanceof Number) { + final long longValue = ((Number) value).longValue(); + return new Time(longValue); + } + + if (value instanceof String) { + try { + final java.util.Date utilDate = getDateFormat(format).parse((String) value); + return new Time(utilDate.getTime()); + } catch (final ParseException e) { + throw new IllegalTypeConversionException("Could not convert value [" + value + + "] of type java.lang.String to Time because the value is not in the expected date format: " + format); + } + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Time"); + } + + private static DateFormat getDateFormat(final String format) { + final DateFormat df = new SimpleDateFormat(format); + df.setTimeZone(gmt); + return df; + } + + public static boolean isTimeTypeCompatible(final Object value, final String format) { + return isDateTypeCompatible(value, format); + } + + public static Timestamp toTimestamp(final Object value, final String format) { + if (value == null) { + return null; + } + + if (value instanceof Timestamp) { + return (Timestamp) value; + } + + if (value instanceof Number) { + final long longValue = ((Number) value).longValue(); + return new Timestamp(longValue); + } + + if (value instanceof String) { + try { + final java.util.Date utilDate = getDateFormat(format).parse((String) value); + return new Timestamp(utilDate.getTime()); + } catch (final ParseException e) { + throw new IllegalTypeConversionException("Could not convert value [" + value + + "] of type java.lang.String to Timestamp because the value is not in the expected date format: " + format); + } + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Timestamp"); + } + + public static boolean isTimestampTypeCompatible(final Object value, final String format) { + return isDateTypeCompatible(value, format); + } + + + public static BigInteger toBigInt(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof BigInteger) { + return (BigInteger) value; + } + if (value instanceof Long) { + return BigInteger.valueOf((Long) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigInteger"); + } + + public static boolean isBigIntTypeCompatible(final Object value) { + return value == null && (value instanceof BigInteger || value instanceof Long); + } + + public static Boolean toBoolean(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof Boolean) { + return (Boolean) value; + } + if (value instanceof String) { + final String string = (String) value; + if (string.equalsIgnoreCase("true")) { + return Boolean.TRUE; + } else if (string.equalsIgnoreCase("false")) { + return Boolean.FALSE; + } + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Boolean"); + } + + public static boolean isBooleanTypeCompatible(final Object value) { + if (value == null) { + return false; + } + if (value instanceof Boolean) { + return true; + } + if (value instanceof String) { + final String string = (String) value; + return string.equalsIgnoreCase("true") || string.equalsIgnoreCase("false"); + } + return false; + } + + public static Double toDouble(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + + if (value instanceof String) { + return Double.parseDouble((String) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Double"); + } + + public static boolean isDoubleTypeCompatible(final Object value) { + return isNumberTypeCompatible(value, s -> Double.parseDouble(s)); + } + + private static boolean isNumberTypeCompatible(final Object value, final Consumer<String> stringValueVerifier) { + if (value == null) { + return false; + } + + if (value instanceof Number) { + return true; + } + + if (value instanceof String) { + try { + stringValueVerifier.accept((String) value); + return true; + } catch (final NumberFormatException nfe) { + return false; + } + } + + return false; + } + + public static Float toFloat(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).floatValue(); + } + + if (value instanceof String) { + return Float.parseFloat((String) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Float"); + } + + public static boolean isFloatTypeCompatible(final Object value) { + return isNumberTypeCompatible(value, s -> Float.parseFloat(s)); + } + + public static Long toLong(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + return Long.parseLong((String) value); + } + + if (value instanceof java.util.Date) { + return ((java.util.Date) value).getTime(); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Long"); + } + + public static boolean isLongTypeCompatible(final Object value) { + if (value == null) { + return false; + } + + if (value instanceof Number) { + return true; + } + + if (value instanceof java.util.Date) { + return true; + } + + if (value instanceof String) { + try { + Long.parseLong((String) value); + return true; + } catch (final NumberFormatException nfe) { + return false; + } + } + + return false; + } + + + public static Integer toInteger(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).intValue(); + } + + if (value instanceof String) { + return Integer.parseInt((String) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Integer"); + } + + public static boolean isIntegerTypeCompatible(final Object value) { + return isNumberTypeCompatible(value, s -> Integer.parseInt(s)); + } + + + public static Short toShort(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).shortValue(); + } + + if (value instanceof String) { + return Short.parseShort((String) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Short"); + } + + public static boolean isShortTypeCompatible(final Object value) { + return isNumberTypeCompatible(value, s -> Short.parseShort(s)); + } + + public static Byte toByte(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).byteValue(); + } + + if (value instanceof String) { + return Byte.parseByte((String) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Byte"); + } + + public static boolean isByteTypeCompatible(final Object value) { + return isNumberTypeCompatible(value, s -> Byte.parseByte(s)); + } + + + public static Character toCharacter(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof Character) { + return ((Character) value); + } + + if (value instanceof CharSequence) { + final CharSequence charSeq = (CharSequence) value; + if (charSeq.length() == 0) { + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Character because it has a length of 0"); + } + + return charSeq.charAt(0); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Character"); + } + + public static boolean isCharacterTypeCompatible(final Object value) { + return value != null && (value instanceof Character || (value instanceof CharSequence && ((CharSequence) value).length() > 0)); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java new file mode 100644 index 0000000..38b5d20 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.util; + +public class IllegalTypeConversionException extends RuntimeException { + + public IllegalTypeConversionException(final String message) { + super(message); + } + + public IllegalTypeConversionException(final String message, final Throwable cause) { + super(message, cause); + } +}
