inigoao commented on a change in pull request #257: URL: https://github.com/apache/plc4x/pull/257#discussion_r663823273
########## File path: plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java ########## @@ -0,0 +1,130 @@ +package org.apache.plc4x.nifi.record; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.avro.Schema; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.plc4x.java.api.messages.PlcReadResponse; +import org.apache.plc4x.java.api.value.PlcValue; +import org.apache.plc4x.nifi.util.Plc4xCommon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class Plc4xReadResponseRecordSet implements RecordSet, Closeable { + private static final Logger logger = LoggerFactory.getLogger(Plc4xReadResponseRecordSet.class); + private final PlcReadResponse readResponse; + private final Set<String> rsColumnNames; + private boolean moreRows; + + // TODO: review this AtomicReference? + // TODO: this could be enhanced checking if record schema should be updated (via a cache boolean, checking property values is a nifi expression language, etc) + private AtomicReference<RecordSchema> recordSchema; + + public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse) throws IOException { + this.readResponse = readResponse; + moreRows = true; + + logger.debug("Creating record schema from PlcReadResponse"); + Map<String, ? extends PlcValue> responseDataStructure = readResponse.getAsPlcValue().getStruct(); + rsColumnNames = responseDataStructure.keySet(); + + if (recordSchema == null) { + Schema avroSchema = Plc4xCommon.createSchema(responseDataStructure); //TODO review this method as it is the 'mapping' from PlcValues to avro datatypes + recordSchema = new AtomicReference<RecordSchema>(); + recordSchema.set(AvroTypeUtil.createSchema(avroSchema)); + } + logger.debug("Record schema from PlcReadResponse successfuly created."); + + } + + + @Override + public RecordSchema getSchema() { + return this.recordSchema.get(); + } + + // Protected methods for subclasses to access private member variables + protected PlcReadResponse getReadResponse() { + return readResponse; + } + + protected boolean hasMoreRows() { + return moreRows; + } + + protected void setMoreRows(boolean moreRows) { + this.moreRows = moreRows; + } + + @Override + public Record next() throws IOException { + if (moreRows) { + final Record record = createRecord(readResponse); + setMoreRows(false); + return record; + } else { + return null; + } + } + + @Override + public void close() { + //do nothing + } + + protected Record createRecord(final PlcReadResponse readResponse) throws IOException{ + final Map<String, Object> values = new HashMap<>(getSchema().getFieldCount()); + + logger.debug("creating record."); + + for (final RecordField field : getSchema().getFields()) { + final String fieldName = field.getFieldName(); + + final Object value; + + //TODO + if (rsColumnNames.contains(fieldName)) { + value = normalizeValue(readResponse.getObject(fieldName)); + } else { + value = null; + } + //TODO we are asuming that record schema is always inferred from request, not writen by the user, so maybe previous lines could be changed by the following one + // value = normalizeValue(readResponse.getObject(fieldName)); + Review comment: I dont understand what does the .trace mean.. could you please provide further info about it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@plc4x.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org