This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 792cf133c31 UDF: add an udf function for envelope demodulation 
analysis (#12146)
792cf133c31 is described below

commit 792cf133c317e560e37cf055f2f98c445a8deb46
Author: Zhijia Cao <[email protected]>
AuthorDate: Thu Mar 14 17:29:54 2024 +0800

    UDF: add an udf function for envelope demodulation analysis (#12146)
    
    This commit adds a UDF function for envelope demodulation analysis. The 
function of envelope demodulation analysis is designed to realize the 
demodulation and envelope extraction of the signal by inputting one-dimensional 
floating-point numbers and the modulation frequency specified by the user. The 
goal of demodulation is to extract parts of interest from complex signals and 
make them easier to understand. For example, demodulation can find the envelope 
of the signal, that is, the tr [...]
---
 .../iotdb/libudf/it/dprofile/DProfileIT.java       |  18 ++
 library-udf/pom.xml                                |   5 +
 .../library/frequency/UDFEnvelopeAnalysis.java     | 248 +++++++++++++++++++++
 .../org/apache/iotdb/library/UDFEnvelopeTest.java  | 121 ++++++++++
 4 files changed, 392 insertions(+)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/libudf/it/dprofile/DProfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/libudf/it/dprofile/DProfileIT.java
index ef1c0c1719f..72d23ec5e42 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/libudf/it/dprofile/DProfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/libudf/it/dprofile/DProfileIT.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -147,6 +148,8 @@ public class DProfileIT {
           "create function timeliness as 
'org.apache.iotdb.library.dquality.UDTFTimeliness'");
       statement.execute(
           "create function completeness as 
'org.apache.iotdb.library.dquality.UDTFCompleteness'");
+      statement.execute(
+          "CREATE FUNCTION envelope AS 
'org.apache.iotdb.library.frequency.UDFEnvelopeAnalysis'");
     } catch (SQLException throwable) {
       fail(throwable.getMessage());
     }
@@ -572,4 +575,19 @@ public class DProfileIT {
       fail(throwable.getMessage());
     }
   }
+
+  @Test
+  public void testEnvelope() {
+    String sqlStr = "select envelope(s1,'frequency'='10') from root.**";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      while (resultSet.next()) {
+        double result = resultSet.getDouble(2);
+        Assert.assertEquals(1.4365, result, 0.01);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
 }
diff --git a/library-udf/pom.xml b/library-udf/pom.xml
index c6e28fad4cf..b1ad7697d3f 100644
--- a/library-udf/pom.xml
+++ b/library-udf/pom.xml
@@ -79,6 +79,11 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/frequency/UDFEnvelopeAnalysis.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/frequency/UDFEnvelopeAnalysis.java
new file mode 100644
index 00000000000..6d04a108640
--- /dev/null
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/frequency/UDFEnvelopeAnalysis.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.frequency;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import 
org.apache.iotdb.udf.api.exception.UDFOutputSeriesDataTypeNotValidException;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.commons.math3.complex.Complex;
+import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList;
+import org.eclipse.collections.impl.list.mutable.primitive.LongArrayList;
+import org.jtransforms.fft.DoubleFFT_1D;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class UDFEnvelopeAnalysis implements UDTF {
+  private double frequency;
+  private int amplification;
+  private String timestampPrecision;
+  private final DoubleArrayList signals = new DoubleArrayList();
+  private final LongArrayList timestamps = new LongArrayList();
+  private static final String TIMESTAMP_PRECISION = "timestampPrecision";
+  private static final String FREQUENCY = "frequency";
+  private static final String AMPLIFICATION = "amplification";
+  public static final String MS_PRECISION = "ms";
+  public static final String US_PRECISION = "us";
+  public static final String NS_PRECISION = "ns";
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(0, Type.DOUBLE, Type.FLOAT, Type.INT32, 
Type.INT64)
+        .validate(
+            x -> (double) x > 0,
+            "The param 'frequency' must > 0.",
+            validator.getParameters().getDoubleOrDefault(FREQUENCY, 0))
+        .validate(
+            x -> (int) x >= 1,
+            "The param 'amplification' must >= 1.",
+            validator.getParameters().getIntOrDefault(AMPLIFICATION, 1));
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations 
configurations)
+      throws Exception {
+    configurations.setAccessStrategy(new 
RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+    frequency = parameters.getDoubleOrDefault(FREQUENCY, Double.MIN_VALUE);
+    amplification = parameters.getIntOrDefault(AMPLIFICATION, 1);
+    timestampPrecision = 
parameters.getSystemStringOrDefault(TIMESTAMP_PRECISION, MS_PRECISION);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    signals.add(getValueAsDouble(row, 0));
+    if (timestamps.size() < 10) {
+      timestamps.add(row.getTime());
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    double[] envelopeValues = envelopeAnalyze(signals.toArray());
+    frequency = frequency != Double.MIN_VALUE ? frequency : 
calculateFrequency(timestamps);
+    int signalSize = signals.size();
+    double[] frequencies = new double[signalSize / 2];
+    for (int i = 0; i < signalSize / 2; i++) {
+      frequencies[i] = i * (frequency * amplification / signalSize);
+    }
+
+    for (int i = 0; i < envelopeValues.length; i++) {
+      collector.putDouble((long) frequencies[i], envelopeValues[i]);
+    }
+  }
+
+  public double[] envelopeAnalyze(double[] signals) {
+    Complex[] hilbertTransformed = calculateHilbert(signals);
+    double[] hilbertAbs = calculateAbs(hilbertTransformed);
+    double[] fftTransformed = calculateFFT(hilbertAbs);
+    return calculateEnvelope(signals.length, fftTransformed);
+  }
+
+  public Complex[] calculateHilbert(double[] timeDomainSignal) {
+    int signalSize = timeDomainSignal.length;
+    // 1. FFT transformer setup
+    DoubleFFT_1D fftTransformer = new DoubleFFT_1D(signalSize);
+
+    // 2. array for FFT output (real + imaginary)
+    double[] frequencyDomainValues = new double[signalSize * 2];
+
+    // 3. copy signal into FFT input array
+    System.arraycopy(timeDomainSignal, 0, frequencyDomainValues, 0, 
signalSize);
+
+    // 4. forward FFT to get frequency representation
+    fftTransformer.realForwardFull(frequencyDomainValues);
+
+    // 5. hilbert filter to zero negative frequencies and double positive 
ones, and adjust filter
+    // for DC and Nyquist when signalSize is even
+    double[] hilbertFilter = new double[signalSize];
+    if (signalSize % 2 == 0) {
+      hilbertFilter[0] = hilbertFilter[signalSize / 2] = 1;
+      Arrays.fill(hilbertFilter, 1, signalSize / 2, 2);
+    } else {
+      // adjust filter for DC when signalSize is odd
+      hilbertFilter[0] = 1;
+      Arrays.fill(hilbertFilter, 1, (signalSize + 1) / 2, 2);
+    }
+
+    // 6. apply Hilbert filter
+    for (int i = 0; i < signalSize; i++) {
+      frequencyDomainValues[2 * i] *= hilbertFilter[i];
+      frequencyDomainValues[2 * i + 1] *= hilbertFilter[i];
+    }
+
+    // 7. inverse FFT to time domain
+    fftTransformer.complexInverse(frequencyDomainValues, true);
+
+    // 8. form analytic signal
+    Complex[] analyticSignals = new Complex[signalSize];
+    for (int i = 0; i < signalSize; i++) {
+      analyticSignals[i] =
+          new Complex(frequencyDomainValues[2 * i], frequencyDomainValues[2 * 
i + 1]);
+    }
+
+    return analyticSignals;
+  }
+
+  private double[] calculateAbs(Complex[] complexNumbers) {
+    double[] magnitudes = new double[complexNumbers.length];
+    for (int i = 0; i < complexNumbers.length; i++) {
+      magnitudes[i] = complexNumbers[i].abs();
+    }
+    return magnitudes;
+  }
+
+  private double[] calculateFFT(double[] realValues) {
+    DoubleFFT_1D fftTransformer = new DoubleFFT_1D(realValues.length);
+    double[] fftComplex = new double[realValues.length * 2];
+    System.arraycopy(realValues, 0, fftComplex, 0, realValues.length);
+    fftTransformer.realForwardFull(fftComplex);
+    return fftComplex;
+  }
+
+  private double[] calculateEnvelope(int originalLength, double[] fftValues) {
+    double[] envelope = new double[originalLength / 2];
+    for (int i = 0; i < envelope.length; i++) {
+      int realIndex = 2 * i;
+      int imagIndex = realIndex + 1;
+      envelope[i] =
+          Math.sqrt(
+                  fftValues[realIndex] * fftValues[realIndex]
+                      + fftValues[imagIndex] * fftValues[imagIndex])
+              / originalLength;
+    }
+    return envelope;
+  }
+
+  public double calculateFrequency(LongArrayList timestamps) {
+    LongArrayList timeDifferences = calculateTimeDifferences(timestamps);
+    long modeTimeDifference = calculateMode(timeDifferences);
+    return calculateFrequencyByTimeUnit(modeTimeDifference, 
timestampPrecision);
+  }
+
+  public LongArrayList calculateTimeDifferences(LongArrayList timestamps) {
+    LongArrayList timeDifferences = new LongArrayList();
+    for (int i = 1; i < timestamps.size(); i++) {
+      timeDifferences.add(timestamps.get(i) - timestamps.get(i - 1));
+    }
+    return timeDifferences;
+  }
+
+  public long calculateMode(LongArrayList timestamps) {
+    Map<Long, Integer> countMap = new HashMap<>();
+    int maxCount = 0;
+    long modeTimeDifference = 0L;
+
+    for (long diff : timestamps.toArray()) {
+      int count = countMap.getOrDefault(diff, 0) + 1;
+      countMap.put(diff, count);
+      if (count > maxCount) {
+        maxCount = count;
+        modeTimeDifference = diff;
+      }
+    }
+    return modeTimeDifference;
+  }
+
+  public double getValueAsDouble(Row row, int index) throws IOException {
+    double ans;
+    switch (row.getDataType(index)) {
+      case INT32:
+        ans = row.getInt(index);
+        break;
+      case INT64:
+        ans = row.getLong(index);
+        break;
+      case FLOAT:
+        ans = row.getFloat(index);
+        break;
+      case DOUBLE:
+        ans = row.getDouble(index);
+        break;
+      default:
+        throw new UDFOutputSeriesDataTypeNotValidException(
+            index, "Fail to get data type in row " + row.getTime());
+    }
+    return ans;
+  }
+
+  public static double calculateFrequencyByTimeUnit(long time, String 
timeUnit) {
+    switch (timeUnit) {
+      case MS_PRECISION:
+        return 1000.0 / time;
+      case US_PRECISION:
+        return 1_000_000.0 / time;
+      case NS_PRECISION:
+        return 1_000_000_000.0 / time;
+      default:
+        throw new IllegalArgumentException("Unsupported time unit.");
+    }
+  }
+}
diff --git 
a/library-udf/src/test/java/org/apache/iotdb/library/UDFEnvelopeTest.java 
b/library-udf/src/test/java/org/apache/iotdb/library/UDFEnvelopeTest.java
new file mode 100644
index 00000000000..1ac26e99e78
--- /dev/null
+++ b/library-udf/src/test/java/org/apache/iotdb/library/UDFEnvelopeTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library;
+
+import org.apache.iotdb.library.frequency.UDFEnvelopeAnalysis;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class UDFEnvelopeTest {
+  private final UDFEnvelopeAnalysis analysis = new UDFEnvelopeAnalysis();
+
+  @Test
+  public void testLinearIncreasing() {
+    double[] expectedEnvelope = {6.2844, 1.5582, 0.8503, 0.5128, 0.2636};
+    double[] vibData = new double[10];
+    for (int i = 0; i < 10; i++) vibData[i] = i + 1;
+    double[] envelope = analysis.envelopeAnalyze(vibData);
+    Assert.assertArrayEquals(expectedEnvelope, envelope, 0.01);
+  }
+
+  @Test
+  public void testLinearDecreasing() {
+    double[] expectedEnvelope = {6.2844, 1.5582, 0.8503, 0.5128, 0.2636};
+    double[] vibData = new double[10];
+    for (int i = 0; i < 10; i++) vibData[i] = 10 - i;
+    double[] envelope = analysis.envelopeAnalyze(vibData);
+    Assert.assertArrayEquals(expectedEnvelope, envelope, 0.01);
+  }
+
+  @Test
+  public void testQuadratic() {
+    double[] expectedEnvelope = {50.0465, 19.4957, 10.2427, 5.9496, 2.0566};
+    double[] vibData = new double[10];
+    for (int i = 0; i < 10; i++) vibData[i] = Math.pow(i + 1, 2);
+    double[] envelope = analysis.envelopeAnalyze(vibData);
+    Assert.assertArrayEquals(expectedEnvelope, envelope, 0.01);
+  }
+
+  @Test
+  public void testSineWave() {
+    double[] expectedEnvelope = {0.9379, 0.0738, 0.0457, 0.0394, 0.0203};
+    double[] vibData = new double[10];
+    for (int i = 0; i < 10; i++) vibData[i] = Math.sin(i * 2 * Math.PI / 9);
+    double[] envelope = analysis.envelopeAnalyze(vibData);
+    Assert.assertArrayEquals(expectedEnvelope, envelope, 0.01);
+  }
+
+  @Test
+  public void testCosineWave() {
+    double[] expectedEnvelope = {1.0437, 0.0097, 0.0163, 0.0056, 0.0005};
+    double[] vibData = new double[10];
+    for (int i = 0; i < 10; i++) vibData[i] = Math.cos(i * 2 * Math.PI / 9);
+    double[] envelope = analysis.envelopeAnalyze(vibData);
+    Assert.assertArrayEquals(expectedEnvelope, envelope, 0.01);
+  }
+
+  @Test
+  public void testExponential() {
+    double[] expectedEnvelope = {6789.997, 4310.316, 2098.5, 1531.1508, 441.3};
+    double[] vibData = new double[10];
+    for (int i = 0; i < 10; i++) vibData[i] = Math.exp(i + 1);
+    double[] envelope = analysis.envelopeAnalyze(vibData);
+    Assert.assertArrayEquals(expectedEnvelope, envelope, 0.01);
+  }
+
+  @Test
+  public void testLogarithmic() {
+    double[] expectedEnvelope = {1.7149, 0.2899, 0.1784, 0.1201, 0.0943};
+    double[] vibData = new double[10];
+    for (int i = 0; i < 10; i++) vibData[i] = Math.log(i + 1);
+    double[] envelope = analysis.envelopeAnalyze(vibData);
+    Assert.assertArrayEquals(expectedEnvelope, envelope, 0.01);
+  }
+
+  @Test
+  public void testAmplitudeModulatedSineWave() {
+    double[] expectedEnvelope = {5.1938, 1.3139, 0.3249, 0.2471, 0.1196};
+    double[] vibData = new double[10];
+    for (int i = 0; i < 10; i++) vibData[i] = (i + 1) * Math.sin(i * 2 * 
Math.PI / 9);
+    double[] envelope = analysis.envelopeAnalyze(vibData);
+    Assert.assertArrayEquals(expectedEnvelope, envelope, 0.01);
+  }
+
+  @Test
+  public void testDampedSineWave() {
+    double[] expectedEnvelope = {0.2345, 0.0824, 0.0207, 0.0161, 0.0075};
+    double[] vibData = new double[10];
+    for (int i = 0; i < 10; i++)
+      vibData[i] = Math.exp(-0.3 * (i + 1)) * Math.sin(i * 2 * Math.PI / 9);
+    double[] envelope = analysis.envelopeAnalyze(vibData);
+    Assert.assertArrayEquals(expectedEnvelope, envelope, 0.01);
+  }
+
+  @Test
+  public void testCompositeWaveform() {
+    double[] expectedEnvelope = {1.2908, 0.3926, 0.0535, 0.0570, 0.0308};
+    double[] vibData = new double[10];
+    for (int i = 0; i < 10; i++)
+      vibData[i] = Math.sin(i * 2 * Math.PI / 9) + Math.cos(i * 4 * Math.PI / 
9);
+    double[] envelope = analysis.envelopeAnalyze(vibData);
+    Assert.assertArrayEquals(expectedEnvelope, envelope, 0.01);
+  }
+}

Reply via email to