This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 105fcb90e261894711ea1cf2bd089cf96f29facf Author: lizhizhou <[email protected]> AuthorDate: Thu Jan 12 16:47:51 2023 +0800 NIFI-10784 Added QueryIoTDBRecord Processor This closes #6844 Co-authored-by: David Handermann <[email protected]> Signed-off-by: David Handermann <[email protected]> (cherry picked from commit 7cb86dd42d52923bae57e2dc0763799f682222f5) --- .../nifi-iotdb-processors/pom.xml | 12 ++ .../org/apache/nifi/processors/AbstractIoTDB.java | 10 ++ .../apache/nifi/processors/QueryIoTDBRecord.java | 174 +++++++++++++++++++++ .../services/org.apache.nifi.processor.Processor | 1 + .../org/apache/nifi/processors/QueryIoTDBIT.java | 101 ++++++++++++ 5 files changed, 298 insertions(+) diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml index 73e6aa462c..21a56177ba 100644 --- a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml @@ -77,6 +77,12 @@ <artifactId>iotdb-server</artifactId> <version>${iotdb.sdk.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.iotdb</groupId> @@ -84,6 +90,12 @@ <version>${iotdb.sdk.version}</version> <type>test-jar</type> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.glassfish.jersey.inject</groupId> diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java index f45275a22d..70fa43cd81 100755 --- a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java @@ -64,6 +64,9 @@ public abstract class AbstractIoTDB extends AbstractProcessor { private static final Map<RecordFieldType, TSDataType> typeMap = new HashMap<>(); + private static final Map<String, RecordFieldType> reversedTypeMap = + new HashMap<>(); + static final Set<RecordFieldType> supportedType = new HashSet<>(); @@ -126,6 +129,9 @@ public abstract class AbstractIoTDB extends AbstractProcessor { typeMap.put(RecordFieldType.LONG, TSDataType.INT64); typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT); typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE); + for (Map.Entry<RecordFieldType, TSDataType> it : typeMap.entrySet()) { + reversedTypeMap.put(String.valueOf(it.getValue()),it.getKey()); + } supportedType.add(RecordFieldType.BOOLEAN); supportedType.add(RecordFieldType.STRING); @@ -184,6 +190,10 @@ public abstract class AbstractIoTDB extends AbstractProcessor { return typeMap.get(type); } + protected RecordFieldType getType(String type) { + return reversedTypeMap.get(type); + } + protected ValidationResult validateSchemaAttribute(String schemaAttribute) { JsonNode schema; try { diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/QueryIoTDBRecord.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/QueryIoTDBRecord.java new file mode 100755 index 0000000000..023b46980c --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/QueryIoTDBRecord.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.SimpleRecordSchema; + +import java.io.OutputStream; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; + +@SupportsBatching +@Tags({"IoT", "Timeseries"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Query Apache IoTDB and write results as Records") +@WritesAttributes({ + @WritesAttribute(attribute = QueryIoTDBRecord.IOTDB_ERROR_MESSAGE, description = "Error message written on query failures"), + @WritesAttribute(attribute = QueryIoTDBRecord.MIME_TYPE, description = "Content Type based on configured Record Set Writer") +}) +public class QueryIoTDBRecord extends AbstractIoTDB { + + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("Query") + .displayName("Query") + .description("IoTDB query to be executed") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() + .name("Fetch Size") + .displayName("Fetch Size") + .description("Maximum number of results to return in a single chunk. Configuring 1 or more enables result set chunking") + .defaultValue(String.valueOf(10_000)) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.createLongValidator(0, 100_000, true)) + .required(true) + .build(); + + public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder() + .name("Record Writer") + .displayName("Record Writer") + .description("Service for writing IoTDB query results as records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final String IOTDB_ERROR_MESSAGE = "iotdb.error.message"; + + public static final String MIME_TYPE = "mime.type"; + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(super.getSupportedPropertyDescriptors()); + propertyDescriptors.add(QUERY); + propertyDescriptors.add(FETCH_SIZE); + propertyDescriptors.add(RECORD_WRITER_FACTORY); + return Collections.unmodifiableList(propertyDescriptors); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue(); + final int fetchSize = context.getProperty(FETCH_SIZE).asInteger(); + final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + + try ( + SessionDataSet sessionDataSet = this.session.get().executeQueryStatement(query); + OutputStream outputStream = session.write(flowFile) + ) { + sessionDataSet.setFetchSize(fetchSize); + + final RecordSchema recordSchema = getRecordSchema(sessionDataSet); + final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, outputStream, flowFile); + while (sessionDataSet.hasNext()) { + final RowRecord rowRecord = sessionDataSet.next(); + final Record record = getRecord(recordSchema, rowRecord); + recordSetWriter.write(record); + } + + recordSetWriter.close(); + flowFile = session.putAttribute(flowFile, MIME_TYPE, recordSetWriter.getMimeType()); + session.transfer(flowFile, REL_SUCCESS); + } catch (final Exception e) { + flowFile = session.putAttribute(flowFile, IOTDB_ERROR_MESSAGE, e.getMessage()); + getLogger().error("IoTDB query failed {}", flowFile, e); + session.transfer(flowFile, REL_FAILURE); + } + } + + private Record getRecord(final RecordSchema schema, final RowRecord rowRecord) { + final Map<String, Object> row = new LinkedHashMap<>(); + final Iterator<String> recordFieldNames = schema.getFieldNames().iterator(); + + // Put Timestamp as first field + row.put(recordFieldNames.next(), rowRecord.getTimestamp()); + + final Iterator<Field> rowRecordFields = rowRecord.getFields().iterator(); + while (recordFieldNames.hasNext()) { + final String recordFieldName = recordFieldNames.next(); + if (rowRecordFields.hasNext()) { + final Field rowRecordField = rowRecordFields.next(); + final TSDataType dataType = rowRecordField.getDataType(); + final Object objectValue = rowRecordField.getObjectValue(dataType); + row.put(recordFieldName, objectValue); + } + } + return new MapRecord(schema, row); + } + + private RecordSchema getRecordSchema(final SessionDataSet sessionDataSet) { + final Iterator<String> columnTypes = sessionDataSet.getColumnTypes().iterator(); + final Iterator<String> columnNames = sessionDataSet.getColumnNames().iterator(); + + final List<RecordField> recordFields = new ArrayList<>(); + while (columnNames.hasNext()) { + final String recordFieldName = columnNames.next(); + final String columnType = columnTypes.next(); + final RecordFieldType recordFieldType = getType(columnType); + final DataType recordDataType = recordFieldType.getDataType(); + final RecordField recordField = new RecordField(recordFieldName, recordDataType); + recordFields.add(recordField); + } + return new SimpleRecordSchema(recordFields); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 4a19720633..962f2a5cf3 100755 --- a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,3 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.PutIoTDBRecord +org.apache.nifi.processors.QueryIoTDBRecord \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/QueryIoTDBIT.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/QueryIoTDBIT.java new file mode 100755 index 0000000000..6e641b268b --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/QueryIoTDBIT.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.MockFlowFile; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +public class QueryIoTDBIT { + private static final String WRITER_SERVICE_ID = "writer"; + + private static final String DEVICE_ID = "root.sg7.d1"; + + private static final String FIRST_MEASUREMENT = "s0"; + + private static final String SECOND_MEASUREMENT = "s1"; + + private static final long TIMESTAMP = 1; + + private TestRunner testRunner; + private MockRecordWriter recordWriter; + private Session session; + + @BeforeEach + public void setRunner() throws IoTDBConnectionException, StatementExecutionException { + testRunner = TestRunners.newTestRunner(QueryIoTDBRecord.class); + recordWriter = new MockRecordWriter("header", true); + testRunner.setProperty(QueryIoTDBRecord.RECORD_WRITER_FACTORY, WRITER_SERVICE_ID); + testRunner.setProperty(QueryIoTDBRecord.IOTDB_HOST, "127.0.0.1"); + testRunner.setProperty(QueryIoTDBRecord.USERNAME, "root"); + testRunner.setProperty(QueryIoTDBRecord.PASSWORD, "root"); + session = new Session.Builder().build(); + session.open(); + + List<String> measurements = new ArrayList<>(2); + measurements.add(FIRST_MEASUREMENT); + measurements.add(SECOND_MEASUREMENT); + + List<String> values = new ArrayList<>(2); + values.add("5.0"); + values.add("6.0"); + session.insertRecord(DEVICE_ID, TIMESTAMP, measurements, values); + } + + @AfterEach + public void shutdown() throws Exception { + testRunner.shutdown(); + recordWriter.disabled(); + session.close(); + EnvironmentUtils.cleanEnv(); + EnvironmentUtils.shutdownDaemon(); + } + + @Test + public void testQueryIoTDBbyProperty() throws InitializationException { + setUpStandardTestConfig(); + + final String query = String.format("SELECT %s, %s FROM %s", FIRST_MEASUREMENT, SECOND_MEASUREMENT, DEVICE_ID); + testRunner.setProperty(QueryIoTDBRecord.QUERY, query); + testRunner.enqueue(new byte[]{}); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1); + + final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutIoTDBRecord.REL_SUCCESS).get(0); + flowFile.assertContentEquals("header\n\"1\",\"5.0\",\"6.0\"\n"); + flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + } + + private void setUpStandardTestConfig() throws InitializationException { + testRunner.addControllerService(WRITER_SERVICE_ID, recordWriter); + testRunner.enableControllerService(recordWriter); + } +}
