http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java deleted file mode 100644 index 4449afc..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.avro; - -import org.apache.avro.Schema; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; - -public class AvroSchemaValidator implements Validator { - - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { - return new ValidationResult.Builder() - .input(input) - .subject(subject) - .valid(true) - .explanation("Expression Language is present") - .build(); - } - - try { - new Schema.Parser().parse(input); - - return new ValidationResult.Builder() - .valid(true) - .build(); - } catch (final Exception e) { - return new ValidationResult.Builder() - .input(input) - .subject(subject) - .valid(false) - .explanation("Not a valid Avro Schema: " + e.getMessage()) - .build(); - } - } - -}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java deleted file mode 100644 index b65026a..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.avro; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.apache.avro.LogicalType; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; -import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; - -public class AvroTypeUtil { - public static final String AVRO_SCHEMA_FORMAT = "avro"; - - public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException { - final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat(); - if (!schemaFormatOption.isPresent()) { - throw new SchemaNotFoundException("No Schema Format was present in the RecordSchema"); - } - - final String schemaFormat = schemaFormatOption.get(); - if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) { - throw new SchemaNotFoundException("Schema provided is not in Avro format"); - } - - final Optional<String> textOption = recordSchema.getSchemaText(); - if (!textOption.isPresent()) { - throw new SchemaNotFoundException("No Schema text was present in the RecordSchema"); - } - - final String text = textOption.get(); - return new Schema.Parser().parse(text); - } - - public static DataType determineDataType(final Schema avroSchema) { - final Type avroType = avroSchema.getType(); - - switch (avroType) { - case BYTES: - case FIXED: - return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); - case ARRAY: - final DataType elementType = determineDataType(avroSchema.getElementType()); - return RecordFieldType.ARRAY.getArrayDataType(elementType); - case BOOLEAN: - return RecordFieldType.BOOLEAN.getDataType(); - case DOUBLE: - return RecordFieldType.DOUBLE.getDataType(); - case ENUM: - case STRING: - return RecordFieldType.STRING.getDataType(); - case FLOAT: - return RecordFieldType.FLOAT.getDataType(); - case INT: { - final LogicalType logicalType = avroSchema.getLogicalType(); - if (logicalType == null) { - return RecordFieldType.INT.getDataType(); - } - - if (LogicalTypes.date().getName().equals(logicalType.getName())) { - return RecordFieldType.DATE.getDataType(); - } else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) { - return RecordFieldType.TIME.getDataType(); - } - - return RecordFieldType.INT.getDataType(); - } - case LONG: { - final LogicalType logicalType = avroSchema.getLogicalType(); - if (logicalType == null) { - return RecordFieldType.LONG.getDataType(); - } - - if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) { - return RecordFieldType.TIMESTAMP.getDataType(); - } else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) { - return RecordFieldType.TIMESTAMP.getDataType(); - } else if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) { - return RecordFieldType.TIME.getDataType(); - } - - return RecordFieldType.LONG.getDataType(); - } - case RECORD: { - final List<Field> avroFields = avroSchema.getFields(); - final List<RecordField> recordFields = new ArrayList<>(avroFields.size()); - - for (final Field field : avroFields) { - final String fieldName = field.name(); - final Schema fieldSchema = field.schema(); - final DataType fieldType = determineDataType(fieldSchema); - recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); - } - - final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY); - return RecordFieldType.RECORD.getRecordDataType(recordSchema); - } - case NULL: - return RecordFieldType.STRING.getDataType(); - case MAP: - final Schema valueSchema = avroSchema.getValueType(); - final DataType valueType = determineDataType(valueSchema); - return RecordFieldType.MAP.getMapDataType(valueType); - case UNION: { - final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream() - .filter(s -> s.getType() != Type.NULL) - .collect(Collectors.toList()); - - if (nonNullSubSchemas.size() == 1) { - return determineDataType(nonNullSubSchemas.get(0)); - } - - final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size()); - for (final Schema subSchema : nonNullSubSchemas) { - final DataType childDataType = determineDataType(subSchema); - possibleChildTypes.add(childDataType); - } - - return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes); - } - } - - return null; - } - - public static RecordSchema createSchema(final Schema avroSchema) { - final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size()); - for (final Field field : avroSchema.getFields()) { - final String fieldName = field.name(); - final DataType dataType = AvroTypeUtil.determineDataType(field.schema()); - - recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases())); - } - - final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY); - return recordSchema; - } - - public static Object[] convertByteArray(final byte[] bytes) { - final Object[] array = new Object[bytes.length]; - for (int i = 0; i < bytes.length; i++) { - array[i] = Byte.valueOf(bytes[i]); - } - return array; - } - - public static ByteBuffer convertByteArray(final Object[] bytes) { - final ByteBuffer bb = ByteBuffer.allocate(bytes.length); - for (final Object o : bytes) { - if (o instanceof Byte) { - bb.put(((Byte) o).byteValue()); - } else { - throw new IllegalTypeConversionException("Cannot convert value " + bytes + " of type " + bytes.getClass() + " to ByteBuffer"); - } - } - bb.flip(); - return bb; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java index 55f796a..c09e3d5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java @@ -17,36 +17,18 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.avro.LogicalType; -import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericData.EnumSymbol; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.util.DataTypeUtils; -import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; public abstract class WriteAvroResult implements RecordSetWriter { private final Schema schema; @@ -59,143 +41,9 @@ public abstract class WriteAvroResult implements RecordSetWriter { return schema; } - protected GenericRecord createAvroRecord(final Record record, final Schema avroSchema) throws IOException { - final GenericRecord rec = new GenericData.Record(avroSchema); - final RecordSchema recordSchema = record.getSchema(); - - for (final RecordField recordField : recordSchema.getFields()) { - final Object rawValue = record.getValue(recordField); - final String fieldName = recordField.getFieldName(); - - final Field field = avroSchema.getField(fieldName); - if (field == null) { - continue; - } - - final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName); - rec.put(fieldName, converted); - } - - return rec; - } - - protected Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) throws IOException { - if (rawValue == null) { - return null; - } - - switch (fieldSchema.getType()) { - case INT: { - final LogicalType logicalType = fieldSchema.getLogicalType(); - if (logicalType == null) { - return DataTypeUtils.toInteger(rawValue, fieldName); - } - - if (LogicalTypes.date().getName().equals(logicalType.getName())) { - final long longValue = DataTypeUtils.toLong(rawValue, fieldName); - final Date date = new Date(longValue); - final Duration duration = Duration.between(new Date(0L).toInstant(), date.toInstant()); - final long days = duration.toDays(); - return (int) days; - } else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) { - final long longValue = DataTypeUtils.toLong(rawValue, fieldName); - final Date date = new Date(longValue); - final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant()); - final long millisSinceMidnight = duration.toMillis(); - return (int) millisSinceMidnight; - } - - return DataTypeUtils.toInteger(rawValue, fieldName); - } - case LONG: { - final LogicalType logicalType = fieldSchema.getLogicalType(); - if (logicalType == null) { - return DataTypeUtils.toLong(rawValue, fieldName); - } - - if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) { - final long longValue = DataTypeUtils.toLong(rawValue, fieldName); - final Date date = new Date(longValue); - final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant()); - return duration.toMillis() * 1000L; - } else if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) { - return DataTypeUtils.toLong(rawValue, fieldName); - } else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) { - return DataTypeUtils.toLong(rawValue, fieldName) * 1000L; - } - - return DataTypeUtils.toLong(rawValue, fieldName); - } - case BYTES: - case FIXED: - if (rawValue instanceof byte[]) { - return ByteBuffer.wrap((byte[]) rawValue); - } - if (rawValue instanceof Object[]) { - return AvroTypeUtil.convertByteArray((Object[]) rawValue); - } else { - throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer"); - } - case MAP: - if (rawValue instanceof Record) { - final Record recordValue = (Record) rawValue; - final Map<String, Object> map = new HashMap<>(); - for (final RecordField recordField : recordValue.getSchema().getFields()) { - final Object v = recordValue.getValue(recordField); - if (v != null) { - map.put(recordField.getFieldName(), v); - } - } - - return map; - } else { - throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map"); - } - case RECORD: - final GenericData.Record avroRecord = new GenericData.Record(fieldSchema); - - final Record record = (Record) rawValue; - for (final RecordField recordField : record.getSchema().getFields()) { - final Object recordFieldValue = record.getValue(recordField); - final String recordFieldName = recordField.getFieldName(); - - final Field field = fieldSchema.getField(recordFieldName); - if (field == null) { - continue; - } - - final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName); - avroRecord.put(recordFieldName, converted); - } - return avroRecord; - case ARRAY: - final Object[] objectArray = (Object[]) rawValue; - final List<Object> list = new ArrayList<>(objectArray.length); - for (final Object o : objectArray) { - final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName); - list.add(converted); - } - return list; - case BOOLEAN: - return DataTypeUtils.toBoolean(rawValue, fieldName); - case DOUBLE: - return DataTypeUtils.toDouble(rawValue, fieldName); - case FLOAT: - return DataTypeUtils.toFloat(rawValue, fieldName); - case NULL: - return null; - case ENUM: - return new EnumSymbol(fieldSchema, rawValue); - case STRING: - return DataTypeUtils.toString(rawValue, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); - } - - return rawValue; - } - @Override public WriteResult write(final Record record, final OutputStream out) throws IOException { - final GenericRecord rec = createAvroRecord(record, schema); + final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { @@ -206,7 +54,6 @@ public abstract class WriteAvroResult implements RecordSetWriter { return WriteResult.of(1, Collections.emptyMap()); } - @Override public String getMimeType() { return "application/avro-binary"; http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java index 74306e4..ba14b3a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java @@ -61,7 +61,7 @@ public class WriteAvroResultWithExternalSchema extends WriteAvroResult { final BinaryEncoder encoder = EncoderFactory.get().blockingBinaryEncoder(bufferedOut, null); do { - final GenericRecord rec = createAvroRecord(record, schema); + final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); datumWriter.write(rec, encoder); encoder.flush(); http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java index dca8aac..d55a5dd 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java @@ -51,7 +51,7 @@ public class WriteAvroResultWithSchema extends WriteAvroResult { dataFileWriter.create(schema, outStream); do { - final GenericRecord rec = createAvroRecord(record, schema); + final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); dataFileWriter.append(rec); nrOfRows++; } while ((record = rs.next()) != null); http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java index 71093de..a57f10b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java @@ -41,8 +41,14 @@ import org.apache.nifi.serialization.record.RecordSchema; public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy { private static final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class); + private final ConfigurationContext context; + + public CSVHeaderSchemaStrategy(final ConfigurationContext context) { + this.context = context; + } + @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final ConfigurationContext context) throws SchemaNotFoundException { + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { try { final CSVFormat csvFormat = CSVUtils.createCSVFormat(context).withFirstRecordAsHeader(); try (final Reader reader = new InputStreamReader(new BOMInputStream(contentStream)); http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java index fb34f8f..9fe4136 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -51,7 +51,8 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact private final AllowableValue headerDerivedAllowableValue = new AllowableValue("csv-header-derived", "Use String Fields From Header", "The first non-comment line of the CSV file is a header line that contains the names of the columns. The schema will be derived by using the " + "column names in the header and assuming that all columns are of type String."); - private final SchemaAccessStrategy headerDerivedSchemaStrategy = new CSVHeaderSchemaStrategy(); + + private volatile SchemaAccessStrategy headerDerivedSchemaStrategy; private volatile CSVFormat csvFormat; private volatile String dateFormat; @@ -96,12 +97,12 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact } @Override - protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry) { + protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) { - return headerDerivedSchemaStrategy; + return new CSVHeaderSchemaStrategy(context); } - return super.getSchemaAccessStrategy(allowableValue, schemaRegistry); + return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java index 1048d21..13afe30 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java @@ -176,7 +176,8 @@ public class CSVUtils { .withAllowMissingColumnNames() .withIgnoreEmptyLines(); - if (context.getProperty(SKIP_HEADER_LINE).asBoolean()) { + final PropertyValue skipHeaderPropertyValue = context.getProperty(SKIP_HEADER_LINE); + if (skipHeaderPropertyValue.getValue() != null && skipHeaderPropertyValue.asBoolean()) { format = format.withFirstRecordAsHeader(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 778c738..6c8deab 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -178,13 +178,13 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac } @Override - protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry) { + protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) { return new SchemaAccessStrategy() { private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class); @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final ConfigurationContext context) throws SchemaNotFoundException { + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { return recordSchema; } @@ -194,7 +194,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac } }; } else { - return super.getSchemaAccessStrategy(allowableValue, schemaRegistry); + return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java deleted file mode 100644 index 27f84e4..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.schema.access; - -import java.io.InputStream; -import java.util.EnumSet; -import java.util.Set; - -import org.apache.avro.Schema; -import org.apache.nifi.avro.AvroTypeUtil; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.serialization.record.RecordSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AvroSchemaTextStrategy implements SchemaAccessStrategy { - private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); - - private static final Logger logger = LoggerFactory.getLogger(AvroSchemaTextStrategy.class); - private final PropertyValue schemaTextPropertyValue; - - public AvroSchemaTextStrategy(final PropertyValue schemaTextPropertyValue) { - this.schemaTextPropertyValue = schemaTextPropertyValue; - } - - @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final ConfigurationContext context) throws SchemaNotFoundException { - final String schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(flowFile).getValue(); - if (schemaText == null || schemaText.trim().isEmpty()) { - throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Text"); - } - - logger.debug("For {} found schema text {}", flowFile, schemaText); - - try { - final Schema avroSchema = new Schema.Parser().parse(schemaText); - return AvroTypeUtil.createSchema(avroSchema); - } catch (final Exception e) { - throw new SchemaNotFoundException("Failed to create schema from the Schema Text after evaluating FlowFile Attributes", e); - } - } - - @Override - public Set<SchemaField> getSuppliedSchemaFields() { - return schemaFields; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java deleted file mode 100644 index 4eec14e..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.schema.access; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.record.RecordSchema; - -public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy { - private final Set<SchemaField> schemaFields; - - public static final String SCHEMA_ID_ATTRIBUTE = "schema.identifier"; - public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version"; - public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = "schema.protocol.version"; - - private final SchemaRegistry schemaRegistry; - - - public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) { - this.schemaRegistry = schemaRegistry; - - schemaFields = new HashSet<>(); - schemaFields.add(SchemaField.SCHEMA_IDENTIFIER); - schemaFields.add(SchemaField.SCHEMA_VERSION); - schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields()); - } - - @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final ConfigurationContext context) throws SchemaNotFoundException, IOException { - final String schemaIdentifier = flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE); - final String schemaVersion = flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE); - final String schemaProtocol = flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); - if (schemaIdentifier == null || schemaVersion == null || schemaProtocol == null) { - throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because it is missing one of the following three required attributes: " - + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); - } - - if (!isNumber(schemaProtocol)) { - throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '" - + schemaProtocol + "', which is not a valid Protocol Version number"); - } - - final int protocol = Integer.parseInt(schemaProtocol); - if (protocol != 1) { - throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '" - + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be 1."); - } - - if (!isNumber(schemaIdentifier)) { - throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '" - + schemaProtocol + "', which is not a valid Schema Identifier number"); - } - - if (!isNumber(schemaVersion)) { - throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '" - + schemaProtocol + "', which is not a valid Schema Version number"); - } - - final long schemaId = Long.parseLong(schemaIdentifier); - final int version = Integer.parseInt(schemaVersion); - - final RecordSchema schema = schemaRegistry.retrieveSchema(schemaId, version); - if (schema == null) { - throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + version + "'"); - } - - return schema; - } - - private static boolean isNumber(final String value) { - if (value == null) { - return false; - } - - final String trimmed = value.trim(); - if (value.isEmpty()) { - return false; - } - - for (int i = 0; i < trimmed.length(); i++) { - final char c = value.charAt(i); - if (c > '9' || c < '0') { - return false; - } - } - - return true; - } - - @Override - public Set<SchemaField> getSuppliedSchemaFields() { - return schemaFields; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java deleted file mode 100644 index f492ec4..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.schema.access; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - -public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter { - private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); - private static final int LATEST_PROTOCOL_VERSION = 1; - - @Override - public void writeHeader(RecordSchema schema, OutputStream out) throws IOException { - } - - @Override - public Map<String, String> getAttributes(final RecordSchema schema) { - final Map<String, String> attributes = new HashMap<>(4); - final SchemaIdentifier id = schema.getIdentifier(); - - final long schemaId = id.getIdentifier().getAsLong(); - final int schemaVersion = id.getVersion().getAsInt(); - - attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId)); - attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion)); - attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(LATEST_PROTOCOL_VERSION)); - - return attributes; - } - - @Override - public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { - final SchemaIdentifier id = schema.getIdentifier(); - if (!id.getIdentifier().isPresent()) { - throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Identifier"); - } - if (!id.getVersion().isPresent()) { - throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Version"); - } - } - - @Override - public Set<SchemaField> getRequiredSchemaFields() { - return requiredSchemaFields; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java deleted file mode 100644 index 081e97c..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.schema.access; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.stream.io.StreamUtils; - -public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy { - private static final int LATEST_PROTOCOL_VERSION = 1; - - private final Set<SchemaField> schemaFields; - private final SchemaRegistry schemaRegistry; - - public HortonworksEncodedSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) { - this.schemaRegistry = schemaRegistry; - - schemaFields = new HashSet<>(); - schemaFields.add(SchemaField.SCHEMA_IDENTIFIER); - schemaFields.add(SchemaField.SCHEMA_VERSION); - schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields()); - } - - @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final ConfigurationContext context) throws SchemaNotFoundException, IOException { - final byte[] buffer = new byte[13]; - try { - StreamUtils.fillBuffer(contentStream, buffer); - } catch (final IOException ioe) { - throw new SchemaNotFoundException("Could not read first 13 bytes from stream", ioe); - } - - // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer - // as it is provided at: - // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java - final ByteBuffer bb = ByteBuffer.wrap(buffer); - final int protocolVersion = bb.get(); - if (protocolVersion != 1) { - throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version " - + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion); - } - - final long schemaId = bb.getLong(); - final int schemaVersion = bb.getInt(); - - return schemaRegistry.retrieveSchema(schemaId, schemaVersion); - } - - @Override - public Set<SchemaField> getSuppliedSchemaFields() { - return schemaFields; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java deleted file mode 100644 index bf6a9ea..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.schema.access; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.EnumSet; -import java.util.Map; -import java.util.OptionalInt; -import java.util.OptionalLong; -import java.util.Set; - -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - -public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter { - private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); - private static final int LATEST_PROTOCOL_VERSION = 1; - - @Override - public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { - final SchemaIdentifier identifier = schema.getIdentifier(); - final long id = identifier.getIdentifier().getAsLong(); - final int version = identifier.getVersion().getAsInt(); - - // This decoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer - // as it is provided at: - // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java - final ByteBuffer bb = ByteBuffer.allocate(13); - bb.put((byte) LATEST_PROTOCOL_VERSION); - bb.putLong(id); - bb.putInt(version); - - out.write(bb.array()); - } - - @Override - public Map<String, String> getAttributes(final RecordSchema schema) { - return Collections.emptyMap(); - } - - @Override - public void validateSchema(RecordSchema schema) throws SchemaNotFoundException { - final SchemaIdentifier identifier = schema.getIdentifier(); - final OptionalLong identifierOption = identifier.getIdentifier(); - if (!identifierOption.isPresent()) { - throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known"); - } - - final OptionalInt versionOption = identifier.getVersion(); - if (!versionOption.isPresent()) { - throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known"); - } - } - - @Override - public Set<SchemaField> getRequiredSchemaFields() { - return requiredSchemaFields; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java deleted file mode 100644 index 6635e3d..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.schema.access; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Set; - -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.serialization.record.RecordSchema; - -public interface SchemaAccessStrategy { - /** - * Returns the schema for the given FlowFile using the supplied stream of content and configuration - * - * @param flowFile flowfile - * @param contentStream content of flowfile - * @param context configuration - * @return the RecordSchema for the FlowFile - */ - RecordSchema getSchema(FlowFile flowFile, InputStream contentStream, ConfigurationContext context) throws SchemaNotFoundException, IOException; - - /** - * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream, ConfigurationContext)}. - */ - Set<SchemaField> getSuppliedSchemaFields(); -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java deleted file mode 100644 index 30a995c..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.schema.access; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.serialization.record.RecordSchema; - -public interface SchemaAccessWriter { - - /** - * Writes the given Record Schema to the given OutputStream as header information, if appropriate, - * or returns without writing anything if the implementation does not need to write information to - * the contents of the FlowFile - * - * @param schema the schema to write - * @param out the OutputStream to write to - * @throws IOException if unable to write to the given stream - */ - void writeHeader(RecordSchema schema, OutputStream out) throws IOException; - - /** - * Returns a Map of String to String that represent the attributes that should be added to the FlowFile, or - * an empty map if no attributes should be added. - * - * @return a Map of attributes to add to the FlowFile. - */ - Map<String, String> getAttributes(RecordSchema schema); - - /** - * Ensures that the given schema can be written by this SchemaAccessWriter or throws SchemaNotFoundException if - * the schema does not contain sufficient information to be written - * - * @param schema the schema to validate - * @throws SchemaNotFoundException if the schema does not contain sufficient information to be written - */ - void validateSchema(RecordSchema schema) throws SchemaNotFoundException; - - /** - * Specifies the set of SchemaField's that are required in order to use this Schema Access Writer - * - * @return the set of SchemaField's that are required in order to use this Schema Access Writer - */ - Set<SchemaField> getRequiredSchemaFields(); -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java deleted file mode 100644 index 54a248d..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.schema.access; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Collections; -import java.util.EnumSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - -public class SchemaNameAsAttribute implements SchemaAccessWriter { - private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME); - private static final String SCHEMA_NAME_ATTRIBUTE = "schema.name"; - - @Override - public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { - } - - @Override - public Map<String, String> getAttributes(final RecordSchema schema) { - final SchemaIdentifier identifier = schema.getIdentifier(); - final Optional<String> nameOption = identifier.getName(); - if (nameOption.isPresent()) { - return Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, nameOption.get()); - } - return Collections.emptyMap(); - } - - @Override - public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { - final SchemaIdentifier schemaId = schema.getIdentifier(); - if (!schemaId.getName().isPresent()) { - throw new SchemaNotFoundException("Cannot write Schema Name As Attribute because the Schema Name is not known"); - } - } - - @Override - public Set<SchemaField> getRequiredSchemaFields() { - return schemaFields; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java deleted file mode 100644 index bc21c1d..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.schema.access; - -import java.io.InputStream; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.record.RecordSchema; - -public class SchemaNamePropertyStrategy implements SchemaAccessStrategy { - private final Set<SchemaField> schemaFields; - - private final SchemaRegistry schemaRegistry; - private final PropertyValue schemaNamePropertyValue; - - public SchemaNamePropertyStrategy(final SchemaRegistry schemaRegistry, final PropertyValue schemaNamePropertyValue) { - this.schemaRegistry = schemaRegistry; - this.schemaNamePropertyValue = schemaNamePropertyValue; - - schemaFields = new HashSet<>(); - schemaFields.add(SchemaField.SCHEMA_NAME); - schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields()); - } - - @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final ConfigurationContext context) throws SchemaNotFoundException { - final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue(); - if (schemaName.trim().isEmpty()) { - throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Name."); - } - - try { - final RecordSchema recordSchema = schemaRegistry.retrieveSchema(schemaName); - if (recordSchema == null) { - throw new SchemaNotFoundException("Could not find a schema with name '" + schemaName + "' in the configured Schema Registry"); - } - - return recordSchema; - } catch (final Exception e) { - throw new SchemaNotFoundException("Could not retrieve schema with name '" + schemaName + "' from the configured Schema Registry", e); - } - } - - @Override - public Set<SchemaField> getSuppliedSchemaFields() { - return schemaFields; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java deleted file mode 100644 index f39bdca..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.schema.access; - -import java.io.OutputStream; -import java.util.Collections; -import java.util.EnumSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -import org.apache.nifi.serialization.record.RecordSchema; - -public class SchemaTextAsAttribute implements SchemaAccessWriter { - private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); - - @Override - public void writeHeader(final RecordSchema schema, final OutputStream out) { - } - - @Override - public Map<String, String> getAttributes(final RecordSchema schema) { - final Optional<String> textFormatOption = schema.getSchemaFormat(); - final Optional<String> textOption = schema.getSchemaText(); - return Collections.singletonMap(textFormatOption.get() + ".schema", textOption.get()); - } - - @Override - public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { - final Optional<String> textFormatOption = schema.getSchemaFormat(); - if (!textFormatOption.isPresent()) { - throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text Format is not present"); - } - - final Optional<String> textOption = schema.getSchemaText(); - if (!textOption.isPresent()) { - throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text is not present"); - } - } - - @Override - public Set<SchemaField> getRequiredSchemaFields() { - return schemaFields; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java deleted file mode 100644 index d5ab8c5..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.serialization.record.RecordFieldType; - -public class DateTimeUtils { - public static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder() - .name("Date Format") - .description("Specifies the format to use when reading/writing Date fields") - .expressionLanguageSupported(false) - .defaultValue(RecordFieldType.DATE.getDefaultFormat()) - .addValidator(new SimpleDateFormatValidator()) - .required(true) - .build(); - - public static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder() - .name("Time Format") - .description("Specifies the format to use when reading/writing Time fields") - .expressionLanguageSupported(false) - .defaultValue(RecordFieldType.TIME.getDefaultFormat()) - .addValidator(new SimpleDateFormatValidator()) - .required(true) - .build(); - - public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder() - .name("Timestamp Format") - .description("Specifies the format to use when reading/writing Timestamp fields") - .expressionLanguageSupported(false) - .defaultValue(RecordFieldType.TIMESTAMP.getDefaultFormat()) - .addValidator(new SimpleDateFormatValidator()) - .required(true) - .build(); -}
