This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ad71916d94 NIFI-14486 Upgraded IoTDB libraries from 1.3.2 to 2.0.2
ad71916d94 is described below
commit ad71916d94b1ef86b8ad54fc16130c13d799ea4b
Author: exceptionfactory <[email protected]>
AuthorDate: Mon Apr 21 08:33:01 2025 -0500
NIFI-14486 Upgraded IoTDB libraries from 1.3.2 to 2.0.2
Signed-off-by: Pierre Villard <[email protected]>
This closes #9886.
---
.../org/apache/nifi/processors/AbstractIoTDB.java | 15 +++++----
.../org/apache/nifi/processors/PutIoTDBRecord.java | 39 +++++++++-------------
.../apache/nifi/processors/QueryIoTDBRecord.java | 6 ++--
.../nifi/processors/model/DatabaseField.java | 6 ++--
.../nifi/processors/model/DatabaseSchema.java | 6 ++--
.../apache/nifi/processors/AbstractIoTDBTest.java | 20 ++++++-----
.../apache/nifi/processors/PutIoTDBRecordTest.java | 6 ++--
nifi-extension-bundles/nifi-iotdb-bundle/pom.xml | 2 +-
8 files changed, 48 insertions(+), 52 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java
index df034d9639..c81aaf24b4 100755
---
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java
+++
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java
@@ -36,12 +36,13 @@ import org.apache.nifi.processors.model.DatabaseField;
import org.apache.nifi.processors.model.DatabaseSchema;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.Session;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
@@ -312,7 +313,7 @@ public abstract class AbstractIoTDB extends
AbstractProcessor {
final Map<String, Tablet> tablets = new LinkedHashMap<>();
deviceMeasurementMap.forEach(
(device, measurements) -> {
- ArrayList<MeasurementSchema> schemas = new ArrayList<>();
+ final List<IMeasurementSchema> schemas = new ArrayList<>();
for (String measurement : measurements) {
TSDataType dataType = schema.getDataType(measurement);
TSEncoding encoding =
schema.getEncodingType(measurement);
diff --git
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java
index a677840a22..e588fcb171 100755
---
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java
+++
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java
@@ -25,9 +25,9 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.nifi.processors.model.DatabaseSchema;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -152,17 +152,17 @@ public class PutIoTDBRecord extends AbstractIoTDB {
for (final Map.Entry<String, Tablet> entry :
tablets.entrySet()) {
Tablet tablet = entry.getValue();
- int rowIndex = tablet.rowSize++;
+ int rowIndex = tablet.getRowSize() + 1;
tablet.addTimestamp(rowIndex, timestamp);
- List<MeasurementSchema> measurements = tablet.getSchemas();
- for (MeasurementSchema measurement : measurements) {
- String id = measurement.getMeasurementId();
+ List<IMeasurementSchema> measurements =
tablet.getSchemas();
+ for (IMeasurementSchema measurement : measurements) {
+ String id = measurement.getMeasurementName();
TSDataType type = measurement.getType();
Object value = getTypedValue(record.getValue(id),
type);
tablet.addValue(id, rowIndex, value);
}
- filled = tablet.rowSize == tablet.getMaxRowNumber();
+ filled = tablet.getRowSize() == tablet.getMaxRowNumber();
}
if (filled) {
if (aligned) {
@@ -177,7 +177,7 @@ public class PutIoTDBRecord extends AbstractIoTDB {
final AtomicBoolean remaining = new AtomicBoolean(false);
tablets.forEach(
(device, tablet) -> {
- if (!remaining.get() && tablet.rowSize != 0) {
+ if (!remaining.get() && tablet.getRowSize() != 0) {
remaining.set(true);
}
});
@@ -214,20 +214,13 @@ public class PutIoTDBRecord extends AbstractIoTDB {
private long getTimestamp(final String timeField, final Record record) {
final long timestamp;
final Object time = record.getValue(timeField);
- if (time instanceof Timestamp) {
- Timestamp temp = (Timestamp) time;
- timestamp = temp.getTime();
- } else if (time instanceof Time) {
- Time temp = (Time) time;
- timestamp = temp.getTime();
- } else if (time instanceof Date) {
- Date temp = (Date) time;
- timestamp = temp.getTime();
- } else if (time instanceof Long) {
- timestamp = (Long) time;
- } else {
- throw new IllegalArgumentException(String.format("Unexpected Time
Field Type: %s", time));
- }
+ timestamp = switch (time) {
+ case Timestamp temp -> temp.getTime();
+ case Time temp -> temp.getTime();
+ case Date temp -> temp.getTime();
+ case Long number -> number;
+ case null, default -> throw new
IllegalArgumentException(String.format("Unexpected Time Field Type: %s", time));
+ };
return timestamp;
}
diff --git
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/QueryIoTDBRecord.java
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/QueryIoTDBRecord.java
index 6670608945..d54ddc0d56 100755
---
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/QueryIoTDBRecord.java
+++
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/QueryIoTDBRecord.java
@@ -17,9 +17,9 @@
package org.apache.nifi.processors;
import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.read.common.RowRecord;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
diff --git
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseField.java
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseField.java
index 4c944cdf4c..30b0be2c18 100755
---
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseField.java
+++
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseField.java
@@ -19,9 +19,9 @@ package org.apache.nifi.processors.model;
import java.util.HashMap;
import java.util.Set;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
public class DatabaseField {
private String tsName;
diff --git
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseSchema.java
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseSchema.java
index 9d0a58d842..9ad783ffae 100755
---
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseSchema.java
+++
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseSchema.java
@@ -24,9 +24,9 @@ import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
public class DatabaseSchema {
private final Map<String, DatabaseField> fieldMap;
diff --git
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/AbstractIoTDBTest.java
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/AbstractIoTDBTest.java
index 05ed921258..beea298b84 100755
---
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/AbstractIoTDBTest.java
+++
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/AbstractIoTDBTest.java
@@ -19,10 +19,11 @@ package org.apache.nifi.processors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.processors.model.DatabaseSchema;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
@@ -38,6 +39,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -219,7 +221,7 @@ public class AbstractIoTDBTest {
Map<String, Tablet> tablets = processor.generateTablets(schema,
"root.test_sg.test_d1.", 1);
Map<String, Tablet> exceptedTablets = new HashMap<>();
- List<MeasurementSchema> schemas = Arrays.asList(
+ List<IMeasurementSchema> schemas = Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.PLAIN));
exceptedTablets.put("root.test_sg.test_d1", new
Tablet("root.test_sg.test_d1", schemas, 1));
@@ -227,10 +229,10 @@ public class AbstractIoTDBTest {
assertEquals("root.test_sg.test_d1", tablets.keySet().toArray()[0]);
assertEquals(exceptedTablets.get("root.test_sg.test_d1").getSchemas(),
tablets.get("root.test_sg.test_d1").getSchemas());
assertEquals(exceptedTablets.get("root.test_sg.test_d1").getMaxRowNumber(),
tablets.get("root.test_sg.test_d1").getMaxRowNumber());
-
assertEquals(exceptedTablets.get("root.test_sg.test_d1").getTimeBytesSize(),
tablets.get("root.test_sg.test_d1").getTimeBytesSize());
-
assertEquals(exceptedTablets.get("root.test_sg.test_d1").getTotalValueOccupation(),
tablets.get("root.test_sg.test_d1").getTotalValueOccupation());
- assertEquals(exceptedTablets.get("root.test_sg.test_d1").deviceId,
tablets.get("root.test_sg.test_d1").deviceId);
- assertEquals(exceptedTablets.get("root.test_sg.test_d1").rowSize,
tablets.get("root.test_sg.test_d1").rowSize);
+
assertArrayEquals(exceptedTablets.get("root.test_sg.test_d1").getTimestamps(),
tablets.get("root.test_sg.test_d1").getTimestamps());
+
assertArrayEquals(exceptedTablets.get("root.test_sg.test_d1").getValues(),
tablets.get("root.test_sg.test_d1").getValues());
+
assertEquals(exceptedTablets.get("root.test_sg.test_d1").getDeviceId(),
tablets.get("root.test_sg.test_d1").getDeviceId());
+ assertEquals(exceptedTablets.get("root.test_sg.test_d1").getRowSize(),
tablets.get("root.test_sg.test_d1").getRowSize());
}
public static class TestAbstractIoTDBProcessor extends AbstractIoTDB {
diff --git
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordTest.java
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordTest.java
index 5fe2cc3e72..f26b0cb152 100755
---
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordTest.java
+++
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordTest.java
@@ -19,9 +19,9 @@ package org.apache.nifi.processors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.processors.model.DatabaseSchema;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.junit.jupiter.api.Test;
diff --git a/nifi-extension-bundles/nifi-iotdb-bundle/pom.xml
b/nifi-extension-bundles/nifi-iotdb-bundle/pom.xml
index 6b927ea97d..21289be982 100644
--- a/nifi-extension-bundles/nifi-iotdb-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-iotdb-bundle/pom.xml
@@ -30,7 +30,7 @@
</modules>
<properties>
- <iotdb.sdk.version>1.3.2</iotdb.sdk.version>
+ <iotdb.sdk.version>2.0.2</iotdb.sdk.version>
</properties>
</project>