tpalfy commented on code in PR #8250: URL: https://github.com/apache/nifi/pull/8250#discussion_r1487902665
########## nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf; + +import com.squareup.wire.schema.CoreLoaderKt; +import com.squareup.wire.schema.Location; +import com.squareup.wire.schema.Schema; +import com.squareup.wire.schema.SchemaLoader; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.services.protobuf.schema.ProtoSchemaStrategy; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileSystems; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@Tags({"protobuf", "record", "reader", "parser"}) +@CapabilityDescription("Parses a Protocol Buffers message from binary format.") +public class ProtobufReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final String ANY_PROTO = "google/protobuf/any.proto"; + private static final String DURATION_PROTO = "google/protobuf/duration.proto"; + private static final String EMPTY_PROTO = "google/protobuf/empty.proto"; + private static final String STRUCT_PROTO = "google/protobuf/struct.proto"; + private static final String TIMESTAMP_PROTO = "google/protobuf/timestamp.proto"; + private static final String WRAPPERS_PROTO = "google/protobuf/wrappers.proto"; + + private static final AllowableValue GENERATE_FROM_PROTO_FILE = new AllowableValue("generate-from-proto-file", + "Generate from Proto file", "The record schema is generated from the provided proto file"); + + private String message; + private Schema protoSchema; + + public static final PropertyDescriptor PROTOBUF_DIRECTORY = new PropertyDescriptor.Builder() + .name("Proto Directory") + .displayName("Proto Directory") + .description("Directory containing Protocol Buffers message definition (.proto) file(s).") + .required(true) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, false)) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + public static final PropertyDescriptor MESSAGE_TYPE = new PropertyDescriptor.Builder() + .name("Message Type") + .displayName("Message Type") + .description("Fully qualified name of the Protocol Buffers message type including its package (eg. mypackage.MyMessage). " + + "The .proto files configured in '" + PROTOBUF_DIRECTORY.getDisplayName() + "' must contain the definition of this message type.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(PROTOBUF_DIRECTORY); + properties.add(MESSAGE_TYPE); + return properties; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final String protoDirectory = context.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue(); + message = context.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue(); Review Comment: The designation `message` is consistently used throughout this module when it's really about message type. It can be confusing. ```suggestion messageType = context.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue(); ``` ########## nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java: ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf.converter; + +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.UnknownFieldSet; +import com.squareup.wire.schema.EnumType; +import com.squareup.wire.schema.Field; +import com.squareup.wire.schema.MessageType; +import com.squareup.wire.schema.OneOf; +import com.squareup.wire.schema.ProtoType; +import com.squareup.wire.schema.Schema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.services.protobuf.FieldType; +import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +import static com.google.protobuf.CodedInputStream.decodeZigZag32; +import static com.google.protobuf.TextFormat.unsignedToString; + +/** + * The class is responsible for creating Record by mapping the provided proto schema fields with the list of Unknown fields parsed from encoded proto data. + */ +public class ProtobufDataConverter { + + public static final String MAP_KEY_FIELD_NAME = "key"; + public static final String MAP_VALUE_FIELD_NAME = "value"; + public static final String ANY_TYPE_URL_FIELD_NAME = "type_url"; + public static final String ANY_VALUE_FIELD_NAME = "value"; + public static final String ANY_MESSAGE_TYPE = "google.protobuf.Any"; + + private final Schema schema; + private final String message; + private final RecordSchema rootRecordSchema; + private final boolean coerceTypes; + private final boolean dropUnknownFields; + + private boolean containsAnyField = false; + + public ProtobufDataConverter(Schema schema, String message, RecordSchema recordSchema, boolean coerceTypes, boolean dropUnknownFields) { + this.schema = schema; + this.message = message; + this.rootRecordSchema = recordSchema; + this.coerceTypes = coerceTypes; + this.dropUnknownFields = dropUnknownFields; + } + + /** + * Creates a record from the root message. + * + * @return created record + * @throws IOException failed to read input stream + */ + public MapRecord createRecord(InputStream data) throws IOException { + final MessageType rootType = (MessageType) schema.getType(message); + Objects.requireNonNull(rootType, String.format("Message with name [%s] not found in the provided proto files", message)); + + MapRecord record = createRecord(rootType, ByteString.readFrom(data), rootRecordSchema); + if (containsAnyField) { + record.regenerateSchema(); + } + + return record; + } + + /** + * Creates a record for the provided message. + * + * @param messageType message to create a record from + * @param data proto message data + * @param recordSchema record schema for the created record + * @return created record + * @throws InvalidProtocolBufferException failed to parse input data + */ + private MapRecord createRecord(MessageType messageType, ByteString data, RecordSchema recordSchema) throws InvalidProtocolBufferException { + final UnknownFieldSet unknownFieldSet = UnknownFieldSet.parseFrom(data); + + if ((ANY_MESSAGE_TYPE).equals(messageType.getType().toString())) { + containsAnyField = true; + return handleAnyField(unknownFieldSet); + } + + return new MapRecord(recordSchema, processMessageFields(messageType, unknownFieldSet), false, dropUnknownFields); + } + + /** + * Process declared, extension and oneOf fields in the provided message. + * + * @param messageType message with fields to be processed + * @param unknownFieldSet received proto data fields + * @return Map of processed fields + */ + private Map<String, Object> processMessageFields(MessageType messageType, UnknownFieldSet unknownFieldSet) throws InvalidProtocolBufferException { + Map<String, Object> recordValues = new HashMap<>(); + + for (final Field field : messageType.getDeclaredFields()) { + getField(new ProtoField(field), unknownFieldSet.getField(field.getTag()), recordValues); + } + + for (final Field field : messageType.getExtensionFields()) { + getField(new ProtoField(field), unknownFieldSet.getField(field.getTag()), recordValues); + } + + for (final OneOf oneOf : messageType.getOneOfs()) { + for (Field field : oneOf.getFields()) { + getField(new ProtoField(field), unknownFieldSet.getField(field.getTag()), recordValues); + } + } + return recordValues; + } + + /** + * Checks the field value's presence and sets it into the result Map. + * + * @param protoField proto field's properties + * @param unknownField field's value + * @param values Map of values + */ + private void getField(ProtoField protoField, UnknownFieldSet.Field unknownField, Map<String, Object> values) throws InvalidProtocolBufferException { + Optional<Object> fieldValue = convertFieldValues(protoField, unknownField); + fieldValue.ifPresent(o -> values.put(protoField.getFieldName(), o)); + } + + private Optional<Object> convertFieldValues(ProtoField protoField, UnknownFieldSet.Field unknownField) throws InvalidProtocolBufferException { + if (!unknownField.getLengthDelimitedList().isEmpty()) { + return Optional.of(convertLengthDelimitedFields(protoField, unknownField.getLengthDelimitedList())); + } + if (!unknownField.getFixed32List().isEmpty()) { + return Optional.of(convertFixed32Fields(protoField, unknownField.getFixed32List())); + } + if (!unknownField.getFixed64List().isEmpty()) { + return Optional.of(convertFixed64Fields(protoField, unknownField.getFixed64List())); + } + if (!unknownField.getVarintList().isEmpty()) { + return Optional.of(convertVarintFields(protoField, unknownField.getVarintList())); + } + + return Optional.empty(); + } + + /** + * Converts a Length-Delimited field value into it's suitable data type. + * + * @param protoField proto field's properties + * @param values field's unprocessed values + * @return converted field values + * @throws InvalidProtocolBufferException failed to parse input data + */ + private Object convertLengthDelimitedFields(ProtoField protoField, List<ByteString> values) throws InvalidProtocolBufferException { + final ProtoType protoType = protoField.getProtoType(); + if (protoType.isScalar()) { + switch (FieldType.findValue(protoType.getSimpleName())) { + case STRING: + return resolveFieldValue(protoField, values, ByteString::toStringUtf8); + case BYTES: + return resolveFieldValue(protoField, values, ByteString::toByteArray); + default: + throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," + + " [%s] is not LengthDelimited field type", protoField.getFieldName(), protoType.getSimpleName())); + } Review Comment: Might be easer to maintain and understand if we got rid of the `FieldType` enum end used predefined maps for the type-specific converter functions. ```suggestion private Map<String, Function<ByteString, Object>> scalarLengthDelimitedValueProviders = Map.of( "string", ByteString::toStringUtf8, "bytes", ByteString::toByteArray ); private Object convertLengthDelimitedFields(ProtoField protoField, List<ByteString> values) throws InvalidProtocolBufferException { final Object convertedValue; final ProtoType protoType = protoField.getProtoType(); if (protoType.isScalar()) { final Function<ByteString, Object> valueConverter = scalarLengthDelimitedValueProviders.get(protoType.getSimpleName()); if (valueConverter == null) { throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," + " [%s] is not LengthDelimited field type", protoField.getFieldName(), protoType.getSimpleName())); } else { convertedValue = resolveFieldValue(protoField, values, valueConverter); } ``` ########## nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java: ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf.converter; + +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.UnknownFieldSet; +import com.squareup.wire.schema.EnumType; +import com.squareup.wire.schema.Field; +import com.squareup.wire.schema.MessageType; +import com.squareup.wire.schema.OneOf; +import com.squareup.wire.schema.ProtoType; +import com.squareup.wire.schema.Schema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.services.protobuf.FieldType; +import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +import static com.google.protobuf.CodedInputStream.decodeZigZag32; +import static com.google.protobuf.TextFormat.unsignedToString; + +/** + * The class is responsible for creating Record by mapping the provided proto schema fields with the list of Unknown fields parsed from encoded proto data. + */ +public class ProtobufDataConverter { + + public static final String MAP_KEY_FIELD_NAME = "key"; + public static final String MAP_VALUE_FIELD_NAME = "value"; + public static final String ANY_TYPE_URL_FIELD_NAME = "type_url"; + public static final String ANY_VALUE_FIELD_NAME = "value"; + public static final String ANY_MESSAGE_TYPE = "google.protobuf.Any"; + + private final Schema schema; + private final String message; + private final RecordSchema rootRecordSchema; + private final boolean coerceTypes; + private final boolean dropUnknownFields; + + private boolean containsAnyField = false; + + public ProtobufDataConverter(Schema schema, String message, RecordSchema recordSchema, boolean coerceTypes, boolean dropUnknownFields) { + this.schema = schema; + this.message = message; + this.rootRecordSchema = recordSchema; + this.coerceTypes = coerceTypes; + this.dropUnknownFields = dropUnknownFields; + } + + /** + * Creates a record from the root message. + * + * @return created record + * @throws IOException failed to read input stream + */ + public MapRecord createRecord(InputStream data) throws IOException { + final MessageType rootType = (MessageType) schema.getType(message); + Objects.requireNonNull(rootType, String.format("Message with name [%s] not found in the provided proto files", message)); + + MapRecord record = createRecord(rootType, ByteString.readFrom(data), rootRecordSchema); + if (containsAnyField) { + record.regenerateSchema(); + } + + return record; + } + + /** + * Creates a record for the provided message. + * + * @param messageType message to create a record from + * @param data proto message data + * @param recordSchema record schema for the created record + * @return created record + * @throws InvalidProtocolBufferException failed to parse input data + */ + private MapRecord createRecord(MessageType messageType, ByteString data, RecordSchema recordSchema) throws InvalidProtocolBufferException { + final UnknownFieldSet unknownFieldSet = UnknownFieldSet.parseFrom(data); + + if ((ANY_MESSAGE_TYPE).equals(messageType.getType().toString())) { + containsAnyField = true; + return handleAnyField(unknownFieldSet); + } + + return new MapRecord(recordSchema, processMessageFields(messageType, unknownFieldSet), false, dropUnknownFields); + } + + /** + * Process declared, extension and oneOf fields in the provided message. + * + * @param messageType message with fields to be processed + * @param unknownFieldSet received proto data fields + * @return Map of processed fields + */ + private Map<String, Object> processMessageFields(MessageType messageType, UnknownFieldSet unknownFieldSet) throws InvalidProtocolBufferException { + Map<String, Object> recordValues = new HashMap<>(); + + for (final Field field : messageType.getDeclaredFields()) { + getField(new ProtoField(field), unknownFieldSet.getField(field.getTag()), recordValues); + } + + for (final Field field : messageType.getExtensionFields()) { + getField(new ProtoField(field), unknownFieldSet.getField(field.getTag()), recordValues); + } + + for (final OneOf oneOf : messageType.getOneOfs()) { + for (Field field : oneOf.getFields()) { + getField(new ProtoField(field), unknownFieldSet.getField(field.getTag()), recordValues); + } + } + return recordValues; + } + + /** + * Checks the field value's presence and sets it into the result Map. + * + * @param protoField proto field's properties + * @param unknownField field's value + * @param values Map of values + */ + private void getField(ProtoField protoField, UnknownFieldSet.Field unknownField, Map<String, Object> values) throws InvalidProtocolBufferException { + Optional<Object> fieldValue = convertFieldValues(protoField, unknownField); + fieldValue.ifPresent(o -> values.put(protoField.getFieldName(), o)); + } + + private Optional<Object> convertFieldValues(ProtoField protoField, UnknownFieldSet.Field unknownField) throws InvalidProtocolBufferException { + if (!unknownField.getLengthDelimitedList().isEmpty()) { + return Optional.of(convertLengthDelimitedFields(protoField, unknownField.getLengthDelimitedList())); + } + if (!unknownField.getFixed32List().isEmpty()) { + return Optional.of(convertFixed32Fields(protoField, unknownField.getFixed32List())); + } + if (!unknownField.getFixed64List().isEmpty()) { + return Optional.of(convertFixed64Fields(protoField, unknownField.getFixed64List())); + } + if (!unknownField.getVarintList().isEmpty()) { + return Optional.of(convertVarintFields(protoField, unknownField.getVarintList())); + } + + return Optional.empty(); + } + + /** + * Converts a Length-Delimited field value into it's suitable data type. + * + * @param protoField proto field's properties + * @param values field's unprocessed values + * @return converted field values + * @throws InvalidProtocolBufferException failed to parse input data + */ + private Object convertLengthDelimitedFields(ProtoField protoField, List<ByteString> values) throws InvalidProtocolBufferException { + final ProtoType protoType = protoField.getProtoType(); + if (protoType.isScalar()) { + switch (FieldType.findValue(protoType.getSimpleName())) { + case STRING: + return resolveFieldValue(protoField, values, ByteString::toStringUtf8); + case BYTES: + return resolveFieldValue(protoField, values, ByteString::toByteArray); + default: + throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," + + " [%s] is not LengthDelimited field type", protoField.getFieldName(), protoType.getSimpleName())); + } + } else if (protoType.isMap()) { + return createMap(protoType, values); + } else { + final MessageType messageType = (MessageType) schema.getType(protoType); + Objects.requireNonNull(messageType, String.format("Message with name [%s] not found in the provided proto files", protoType)); + + final Function<ByteString, Object> getRecord = v -> { + try { + Optional<DataType> recordDataType = rootRecordSchema.getDataType(protoField.getFieldName()); + RecordSchema recordSchema = recordDataType.map(dataType -> + ((RecordDataType) dataType).getChildSchema()).orElse(generateRecordSchema(messageType.getType().toString())); + return createRecord(messageType, v, recordSchema); + } catch (InvalidProtocolBufferException e) { + throw new IllegalStateException("Failed to create record from the provided input data for field " + protoField.getFieldName(), e); + } + }; + + return resolveFieldValue(protoField, values, getRecord); + } + } + + /** + * Converts a Fixed32 field value into it's suitable data type. + * + * @param protoField proto field's properties + * @param values field's unprocessed values + * @return converted field values + */ + private Object convertFixed32Fields(ProtoField protoField, List<Integer> values) { + final String typeName = protoField.getProtoType().getSimpleName(); + switch (FieldType.findValue(typeName)) { + case FIXED32: + return resolveFieldValue(protoField, values, v -> Long.parseLong(unsignedToString(v))); + case SFIXED32: + return resolveFieldValue(protoField, values, v -> v); + case FLOAT: + return resolveFieldValue(protoField, values, Float::intBitsToFloat); + default: + throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," + + " [%s] is not Fixed32 field type", protoField.getFieldName(), typeName)); + } + } + + /** + * Converts a Fixed64 field value into it's suitable data type. + * + * @param protoField proto field's properties + * @param values field's unprocessed values + * @return converted field values + */ + private Object convertFixed64Fields(ProtoField protoField, List<Long> values) { + final String typeName = protoField.getProtoType().getSimpleName(); + switch (FieldType.findValue(typeName)) { + case FIXED64: + return resolveFieldValue(protoField, values, v -> new BigInteger(unsignedToString(v))); + case SFIXED64: + return resolveFieldValue(protoField, values, v -> v); + case DOUBLE: + return resolveFieldValue(protoField, values, Double::longBitsToDouble); + default: + throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," + + " [%s] is not Fixed64 field type", protoField.getFieldName(), typeName)); + + } + } + + /** + * Converts a Varint field value into it's suitable data type. + * + * @param protoField proto field's properties + * @param values field's unprocessed values + * @return converted field values + */ + private Object convertVarintFields(ProtoField protoField, List<Long> values) { + final ProtoType protoType = protoField.getProtoType(); + if (protoType.isScalar()) { + switch (FieldType.findValue(protoType.getSimpleName())) { + case BOOL: + return resolveFieldValue(protoField, values, v -> v.equals(1L)); + case INT32: + case SFIXED32: + return resolveFieldValue(protoField, values, Long::intValue); + case UINT32: + case INT64: + case SFIXED64: + return resolveFieldValue(protoField, values, v -> v); + case UINT64: + return resolveFieldValue(protoField, values, v -> new BigInteger(unsignedToString(v))); + case SINT32: + return resolveFieldValue(protoField, values, v -> decodeZigZag32(v.intValue())); + case SINT64: + return resolveFieldValue(protoField, values, CodedInputStream::decodeZigZag64); + default: + throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," + + " [%s] is not Varint field type", protoField.getFieldName(), protoType.getSimpleName())); + } + } else { + final Function<Long, Object> enumFunction = v -> { + final EnumType enumType = (EnumType) schema.getType(protoType); + Objects.requireNonNull(enumType, String.format("Enum with name [%s] not found in the provided proto files", protoType)); + return enumType.constant(Integer.parseInt(v.toString())).getName(); + }; + + return resolveFieldValue(protoField, values, enumFunction); + } + } + + private <T> Object resolveFieldValue(ProtoField protoField, List<T> values, Function<T, Object> getValue) { Review Comment: ```suggestion private <T> Object resolveFieldValue(ProtoField protoField, List<T> values, Function<T, Object> valueConverter) { ``` ########## nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufRecordReader.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf; + +import com.squareup.wire.schema.Schema; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.protobuf.converter.ProtobufDataConverter; + +import java.io.IOException; +import java.io.InputStream; + +public class ProtobufRecordReader implements RecordReader { + + private final Schema protoSchema; + private final String message; + private final InputStream inputStream; + private RecordSchema recordSchema; + private boolean inputProcessed; + + public ProtobufRecordReader(InputStream inputStream, String message, Schema protoSchema, RecordSchema recordSchema) { Review Comment: ```suggestion I feel the ordering of the fields and parameters could be more consistent. ``` ########## nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf; + +import com.squareup.wire.schema.CoreLoaderKt; +import com.squareup.wire.schema.Location; +import com.squareup.wire.schema.Schema; +import com.squareup.wire.schema.SchemaLoader; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.services.protobuf.schema.ProtoSchemaStrategy; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileSystems; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@Tags({"protobuf", "record", "reader", "parser"}) +@CapabilityDescription("Parses a Protocol Buffers message from binary format.") +public class ProtobufReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final String ANY_PROTO = "google/protobuf/any.proto"; + private static final String DURATION_PROTO = "google/protobuf/duration.proto"; + private static final String EMPTY_PROTO = "google/protobuf/empty.proto"; + private static final String STRUCT_PROTO = "google/protobuf/struct.proto"; + private static final String TIMESTAMP_PROTO = "google/protobuf/timestamp.proto"; + private static final String WRAPPERS_PROTO = "google/protobuf/wrappers.proto"; + + private static final AllowableValue GENERATE_FROM_PROTO_FILE = new AllowableValue("generate-from-proto-file", + "Generate from Proto file", "The record schema is generated from the provided proto file"); + + private String message; + private Schema protoSchema; Review Comment: ```suggestion private volatile String message; private volatile Schema protoSchema; ``` ########## nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtoField.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf.converter; + +import com.squareup.wire.schema.Field; +import com.squareup.wire.schema.ProtoType; + +public class ProtoField { + + private final String fieldName; + private final ProtoType protoType; + private final boolean repeatable; + + public ProtoField(String fieldName, ProtoType protoType, boolean repeatable) { + this.fieldName = fieldName; + this.protoType = protoType; + this.repeatable = repeatable; + } + + public ProtoField(Field field) { + this.fieldName = field.getName(); + this.protoType = field.getType(); + this.repeatable = field.isRepeated(); + } Review Comment: ```suggestion public ProtoField(Field field) { this(field.getName(), field.getType(),field.isRepeated()); } public ProtoField(String fieldName, ProtoType protoType, boolean repeatable) { this.fieldName = fieldName; this.protoType = protoType; this.repeatable = repeatable; } ``` ########## nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf; + +import com.squareup.wire.schema.CoreLoaderKt; +import com.squareup.wire.schema.Location; +import com.squareup.wire.schema.Schema; +import com.squareup.wire.schema.SchemaLoader; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.services.protobuf.schema.ProtoSchemaStrategy; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileSystems; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@Tags({"protobuf", "record", "reader", "parser"}) +@CapabilityDescription("Parses a Protocol Buffers message from binary format.") +public class ProtobufReader extends SchemaRegistryService implements RecordReaderFactory { + + private static final String ANY_PROTO = "google/protobuf/any.proto"; + private static final String DURATION_PROTO = "google/protobuf/duration.proto"; + private static final String EMPTY_PROTO = "google/protobuf/empty.proto"; + private static final String STRUCT_PROTO = "google/protobuf/struct.proto"; + private static final String TIMESTAMP_PROTO = "google/protobuf/timestamp.proto"; + private static final String WRAPPERS_PROTO = "google/protobuf/wrappers.proto"; + + private static final AllowableValue GENERATE_FROM_PROTO_FILE = new AllowableValue("generate-from-proto-file", + "Generate from Proto file", "The record schema is generated from the provided proto file"); + + private String message; + private Schema protoSchema; + + public static final PropertyDescriptor PROTOBUF_DIRECTORY = new PropertyDescriptor.Builder() + .name("Proto Directory") + .displayName("Proto Directory") + .description("Directory containing Protocol Buffers message definition (.proto) file(s).") + .required(true) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, false)) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + public static final PropertyDescriptor MESSAGE_TYPE = new PropertyDescriptor.Builder() + .name("Message Type") + .displayName("Message Type") + .description("Fully qualified name of the Protocol Buffers message type including its package (eg. mypackage.MyMessage). " + + "The .proto files configured in '" + PROTOBUF_DIRECTORY.getDisplayName() + "' must contain the definition of this message type.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(PROTOBUF_DIRECTORY); + properties.add(MESSAGE_TYPE); + return properties; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final String protoDirectory = context.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue(); + message = context.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue(); + + final SchemaLoader schemaLoader = new SchemaLoader(FileSystems.getDefault()); + schemaLoader.initRoots(Arrays.asList(Location.get(protoDirectory), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, ANY_PROTO), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, DURATION_PROTO), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, EMPTY_PROTO), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, STRUCT_PROTO), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, TIMESTAMP_PROTO), + Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, WRAPPERS_PROTO)), Collections.emptyList()); + protoSchema = schemaLoader.loadSchema(); + } + + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) { Review Comment: This called from the `@OnEnabled` `SchemaRegistryService.storeSchemaAccessStrategy`. At the same time it depends on `ProtobufReader`'s own `onEnabled` method. My Java runtime gathers the `@OnEnabled` methods in an alphanumeric order of the name class so it works but we probably shouldn't rely on that. ########## nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java: ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.protobuf.converter; + +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.UnknownFieldSet; +import com.squareup.wire.schema.EnumType; +import com.squareup.wire.schema.Field; +import com.squareup.wire.schema.MessageType; +import com.squareup.wire.schema.OneOf; +import com.squareup.wire.schema.ProtoType; +import com.squareup.wire.schema.Schema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.services.protobuf.FieldType; +import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +import static com.google.protobuf.CodedInputStream.decodeZigZag32; +import static com.google.protobuf.TextFormat.unsignedToString; + +/** + * The class is responsible for creating Record by mapping the provided proto schema fields with the list of Unknown fields parsed from encoded proto data. + */ +public class ProtobufDataConverter { + + public static final String MAP_KEY_FIELD_NAME = "key"; + public static final String MAP_VALUE_FIELD_NAME = "value"; + public static final String ANY_TYPE_URL_FIELD_NAME = "type_url"; + public static final String ANY_VALUE_FIELD_NAME = "value"; + public static final String ANY_MESSAGE_TYPE = "google.protobuf.Any"; + + private final Schema schema; + private final String message; + private final RecordSchema rootRecordSchema; + private final boolean coerceTypes; + private final boolean dropUnknownFields; + + private boolean containsAnyField = false; + + public ProtobufDataConverter(Schema schema, String message, RecordSchema recordSchema, boolean coerceTypes, boolean dropUnknownFields) { + this.schema = schema; + this.message = message; + this.rootRecordSchema = recordSchema; + this.coerceTypes = coerceTypes; + this.dropUnknownFields = dropUnknownFields; + } + + /** + * Creates a record from the root message. + * + * @return created record + * @throws IOException failed to read input stream + */ + public MapRecord createRecord(InputStream data) throws IOException { + final MessageType rootType = (MessageType) schema.getType(message); + Objects.requireNonNull(rootType, String.format("Message with name [%s] not found in the provided proto files", message)); + + MapRecord record = createRecord(rootType, ByteString.readFrom(data), rootRecordSchema); + if (containsAnyField) { + record.regenerateSchema(); + } + + return record; + } + + /** + * Creates a record for the provided message. + * + * @param messageType message to create a record from + * @param data proto message data + * @param recordSchema record schema for the created record + * @return created record + * @throws InvalidProtocolBufferException failed to parse input data + */ + private MapRecord createRecord(MessageType messageType, ByteString data, RecordSchema recordSchema) throws InvalidProtocolBufferException { + final UnknownFieldSet unknownFieldSet = UnknownFieldSet.parseFrom(data); + + if ((ANY_MESSAGE_TYPE).equals(messageType.getType().toString())) { + containsAnyField = true; + return handleAnyField(unknownFieldSet); + } + + return new MapRecord(recordSchema, processMessageFields(messageType, unknownFieldSet), false, dropUnknownFields); + } + + /** + * Process declared, extension and oneOf fields in the provided message. + * + * @param messageType message with fields to be processed + * @param unknownFieldSet received proto data fields + * @return Map of processed fields + */ + private Map<String, Object> processMessageFields(MessageType messageType, UnknownFieldSet unknownFieldSet) throws InvalidProtocolBufferException { + Map<String, Object> recordValues = new HashMap<>(); + + for (final Field field : messageType.getDeclaredFields()) { + getField(new ProtoField(field), unknownFieldSet.getField(field.getTag()), recordValues); + } + + for (final Field field : messageType.getExtensionFields()) { + getField(new ProtoField(field), unknownFieldSet.getField(field.getTag()), recordValues); + } + + for (final OneOf oneOf : messageType.getOneOfs()) { + for (Field field : oneOf.getFields()) { + getField(new ProtoField(field), unknownFieldSet.getField(field.getTag()), recordValues); + } + } + return recordValues; + } + + /** + * Checks the field value's presence and sets it into the result Map. + * + * @param protoField proto field's properties + * @param unknownField field's value + * @param values Map of values + */ + private void getField(ProtoField protoField, UnknownFieldSet.Field unknownField, Map<String, Object> values) throws InvalidProtocolBufferException { Review Comment: Not a fan of methods with in-out parameters but if we think it's worth it to make the code more readable a more appropriate signature would be welcome. ```suggestion private void collectFieldValue(Map<String, Object> fieldNameToConvertedValue, ProtoField protoField, UnknownFieldSet.Field unknownField) throws InvalidProtocolBufferException { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
