This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch UTToIT in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b457c0a7afe01f3830465d78c3684a672e5c6c6f Author: JackieTien97 <[email protected]> AuthorDate: Mon Oct 24 15:42:07 2022 +0800 Change previous IT from server module to integration-test --- integration-test/import-control.xml | 9 + .../java/org/apache/iotdb/it/env/AbstractEnv.java | 6 + .../org/apache/iotdb/it/env/DataNodeWrapper.java | 8 + .../java/org/apache/iotdb/it/env/MppConfig.java | 30 +++ .../org/apache/iotdb/it/env/RemoteServerEnv.java | 5 + .../org/apache/iotdb/itbase/env/BaseConfig.java | 40 ++++ .../java/org/apache/iotdb/itbase/env/BaseEnv.java | 2 + .../org/apache/iotdb/db/it/env/StandaloneEnv.java | 5 + .../iotdb/db/it/env/StandaloneEnvConfig.java | 55 ++++++ .../apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java | 68 +++---- .../iotdb/db/it/watermark/IoTDBWatermarkIT.java | 179 ++++++++--------- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../iotdb/db/protocol/mqtt/PublishHandler.java | 4 - .../iotdb/db/protocol/mqtt/PublishHandlerTest.java | 132 ------------- .../apache/iotdb/db/sink/LocalIoTDBSinkTest.java | 218 --------------------- 15 files changed, 286 insertions(+), 477 deletions(-) diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml index d33888162c..95c2749b8f 100644 --- a/integration-test/import-control.xml +++ b/integration-test/import-control.xml @@ -35,6 +35,11 @@ <allow pkg="org\.slf4j.*" regex="true" /> <subpackage name="db.it"> <disallow pkg="org.apache.iotdb.jdbc.*"/> + <allow class="org.apache.iotdb.db.tools.watermark.WatermarkDetector" /> + <allow class="io.netty.buffer.ByteBuf" /> + <allow class="io.netty.buffer.Unpooled" /> + <allow class="org.fusesource.mqtt.client.QoS" /> + <allow class="org.apache.iotdb.commons.path.PartialPath" /> <allow pkg="java.text"/> <allow pkg="org.apache.iotdb.db.it.utils" /> <allow pkg="org\.apache\.iotdb\.db\.it\.utils\.TestUtils.*" regex="true"/> @@ -46,6 +51,10 @@ <allow pkg="org\.apache\.iotdb\.tsfile\.read.*" regex="true" /> <allow pkg="org\.apache\.iotdb\.tsfile\.utils.*" regex="true" /> <allow pkg="org\.apache\.iotdb\.tsfile\.write.*" regex="true" /> + <allow pkg="org\.apache\.iotdb\.db\.protocol\.mqtt.*" regex="true" /> + <allow pkg="io\.moquette\.interception\.messages.*" regex="true" /> + <allow pkg="io\.netty\.handler\.codec\.mqtt.*" regex="true" /> + <allow pkg="org\.apache\.iotdb\.db\.engine\.trigger\.sink\.mqtt.*" regex="true" /> </subpackage> <subpackage name="confignode.it"> <allow class="java.nio.ByteBuffer" /> diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java index ffaa5898f3..994eb323c5 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java @@ -479,4 +479,10 @@ public abstract class AbstractEnv implements BaseEnv { public void shutdownDataNode(int index) { dataNodeWrapperList.get(index).stop(); } + + @Override + public int getMqttPort() { + int randomIndex = new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size()); + return dataNodeWrapperList.get(randomIndex).getMqttPort(); + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java index 27f34f275b..b11b829219 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java @@ -32,6 +32,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper { private final int internalPort; private final int dataRegionConsensusPort; private final int schemaRegionConsensusPort; + private final int mqttPort; public DataNodeWrapper( String targetConfigNode, String testClassName, String testMethodName, int[] portList) { @@ -41,6 +42,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper { this.internalPort = portList[2]; this.dataRegionConsensusPort = portList[3]; this.schemaRegionConsensusPort = portList[4]; + this.mqttPort = portList[5]; } @Override @@ -54,6 +56,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper { "data_region_consensus_port", String.valueOf(this.dataRegionConsensusPort)); properties.setProperty( "schema_region_consensus_port", String.valueOf(this.schemaRegionConsensusPort)); + properties.setProperty("mqtt_host", super.getIp()); + properties.setProperty("mqtt_port", String.valueOf(this.mqttPort)); properties.setProperty("connection_timeout_ms", "30000"); if (this.targetConfigNode != null) { properties.setProperty(IoTDBConstant.TARGET_CONFIG_NODES, this.targetConfigNode); @@ -106,4 +110,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper { public int getSchemaRegionConsensusPort() { return schemaRegionConsensusPort; } + + public int getMqttPort() { + return mqttPort; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java index a1c1b8dcfd..5db9045ba4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java @@ -281,4 +281,34 @@ public class MppConfig implements BaseConfig { engineProperties.setProperty("max_degree_of_index_node", String.valueOf(maxDegreeOfIndexNode)); return this; } + + @Override + public BaseConfig setEnableWatermark(boolean enableWatermark) { + engineProperties.setProperty("watermark_module_opened", String.valueOf(enableWatermark)); + return this; + } + + @Override + public BaseConfig setWatermarkSecretKey(String watermarkSecretKey) { + engineProperties.setProperty("watermark_secret_key", watermarkSecretKey); + return this; + } + + @Override + public BaseConfig setWatermarkBitString(String watermarkBitString) { + engineProperties.setProperty("watermark_bit_string", watermarkBitString); + return this; + } + + @Override + public BaseConfig setWatermarkMethod(String watermarkMethod) { + engineProperties.setProperty("watermark_method", watermarkMethod); + return this; + } + + @Override + public BaseConfig setEnableMQTTService(boolean enableMQTTService) { + engineProperties.setProperty("enable_mqtt_service", String.valueOf(enableMQTTService)); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java index f3d4ad410e..a7ac83d9de 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java @@ -182,4 +182,9 @@ public class RemoteServerEnv implements BaseEnv { public void shutdownDataNode(int index) { getDataNodeWrapperList().get(index).stop(); } + + @Override + public int getMqttPort() { + throw new UnsupportedOperationException(); + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java index 90a69dbc26..8b560e4068 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java @@ -286,4 +286,44 @@ public interface BaseConfig { default int getMaxDegreeOfIndexNode() { return 256; } + + default BaseConfig setEnableWatermark(boolean enableWatermark) { + return this; + } + + default boolean isEnableWatermark() { + return false; + } + + default String getWatermarkSecretKey() { + return "IoTDB*2019@Beijing"; + } + + default BaseConfig setWatermarkSecretKey(String watermarkSecretKey) { + return this; + } + + default String getWatermarkBitString() { + return "100101110100"; + } + + default BaseConfig setWatermarkBitString(String watermarkBitString) { + return this; + } + + default String getWatermarkMethod() { + return "GroupBasedLSBMethod(embed_row_cycle=2,embed_lsb_num=5)"; + } + + default BaseConfig setWatermarkMethod(String watermarkMethod) { + return this; + } + + default boolean isEnableMQTTService() { + return false; + } + + default BaseConfig setEnableMQTTService(boolean enableMQTTService) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 91ca65b99e..2f97f3b37c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -143,4 +143,6 @@ public interface BaseEnv { void startDataNode(int index); void shutdownDataNode(int index); + + int getMqttPort(); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java index 02210a36ca..c11a6cb937 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java @@ -202,4 +202,9 @@ public class StandaloneEnv implements BaseEnv { public void shutdownDataNode(int index) { // Do nothing } + + @Override + public int getMqttPort() { + return 1883; + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java index c40e3141d2..9157c6922e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java @@ -271,4 +271,59 @@ public class StandaloneEnvConfig implements BaseConfig { public int getMaxDegreeOfIndexNode() { return TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode(); } + + @Override + public BaseConfig setEnableWatermark(boolean enableWatermark) { + IoTDBDescriptor.getInstance().getConfig().setEnableWatermark(enableWatermark); + return this; + } + + @Override + public boolean isEnableWatermark() { + return IoTDBDescriptor.getInstance().getConfig().isEnableWatermark(); + } + + @Override + public String getWatermarkSecretKey() { + return IoTDBDescriptor.getInstance().getConfig().getWatermarkSecretKey(); + } + + @Override + public BaseConfig setWatermarkSecretKey(String watermarkSecretKey) { + IoTDBDescriptor.getInstance().getConfig().setWatermarkSecretKey(watermarkSecretKey); + return this; + } + + @Override + public String getWatermarkBitString() { + return IoTDBDescriptor.getInstance().getConfig().getWatermarkBitString(); + } + + @Override + public BaseConfig setWatermarkBitString(String watermarkBitString) { + IoTDBDescriptor.getInstance().getConfig().setWatermarkBitString(watermarkBitString); + return this; + } + + @Override + public String getWatermarkMethod() { + return IoTDBDescriptor.getInstance().getConfig().getWatermarkMethod(); + } + + @Override + public BaseConfig setWatermarkMethod(String watermarkMethod) { + IoTDBDescriptor.getInstance().getConfig().setWatermarkMethod(watermarkMethod); + return this; + } + + @Override + public boolean isEnableMQTTService() { + return IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService(); + } + + @Override + public BaseConfig setEnableMQTTService(boolean enableMQTTService) { + IoTDBDescriptor.getInstance().getConfig().setEnableMQTTService(enableMQTTService); + return this; + } } diff --git a/server/src/test/java/org/apache/iotdb/db/sink/MQTTSinkTest.java b/integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java similarity index 76% rename from server/src/test/java/org/apache/iotdb/db/sink/MQTTSinkTest.java rename to integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java index 6ae304ad96..d4c6bccf77 100644 --- a/server/src/test/java/org/apache/iotdb/db/sink/MQTTSinkTest.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java @@ -16,25 +16,26 @@ * specific language governing permissions and limitations * under the License. */ - -package org.apache.iotdb.db.sink; +package org.apache.iotdb.db.it.mqtt; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTConfiguration; import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTEvent; import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTHandler; -import org.apache.iotdb.db.utils.EnvironmentUtils; -import org.apache.iotdb.jdbc.Config; +import org.apache.iotdb.it.env.ConfigFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; import org.fusesource.mqtt.client.QoS; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -42,24 +43,28 @@ import java.sql.Statement; import java.sql.Types; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; -import static java.util.concurrent.TimeUnit.MINUTES; -import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -@SuppressWarnings("squid:S2925") -public class MQTTSinkTest { +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBMQTTSinkIT { + + private boolean enableMQTTService; @Before public void setUp() throws Exception { - IoTDBDescriptor.getInstance().getConfig().setEnableMQTTService(true); - EnvironmentUtils.envSetUp(); + enableMQTTService = ConfigFactory.getConfig().isEnableMQTTService(); + ConfigFactory.getConfig().setEnableMQTTService(true); + EnvFactory.getEnv().initBeforeTest(); } @After public void tearDown() throws Exception { - EnvironmentUtils.cleanEnv(); + EnvFactory.getEnv().cleanAfterTest(); + ConfigFactory.getConfig().setEnableMQTTService(enableMQTTService); } @Test @@ -68,31 +73,29 @@ public class MQTTSinkTest { mqttHandler.open( new MQTTConfiguration( "127.0.0.1", - 1883, + EnvFactory.getEnv().getMqttPort(), "root", "root", new PartialPath("root.sg1.d1"), new String[] {"s1"})); - for (int i = 0; i < 10000; ++i) { + for (int i = 0; i < 5; ++i) { mqttHandler.onEvent(new MQTTEvent("test", QoS.EXACTLY_ONCE, false, i, i)); } mqttHandler.close(); - await().atMost(1, MINUTES).until(() -> 10000 == checkSingleSensorHandlerResult()); + TimeUnit.SECONDS.sleep(10); + + assertEquals(5, checkSingleSensorHandlerResult()); } - private int checkSingleSensorHandlerResult() throws ClassNotFoundException { + private int checkSingleSensorHandlerResult() { int count = 0; - Class.forName(Config.JDBC_DRIVER_NAME); - try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { - Assert.assertTrue(statement.execute("select * from root.**")); + try (ResultSet resultSet = statement.executeQuery("select * from root.**")) { - try (ResultSet resultSet = statement.getResultSet()) { ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); checkHeader( @@ -122,13 +125,13 @@ public class MQTTSinkTest { mqttHandler.open( new MQTTConfiguration( "127.0.0.1", - 1883, + EnvFactory.getEnv().getMqttPort(), "root", "root", new PartialPath("root.sg1.d1"), new String[] {"s1", "s2", "s3", "s4", "s5", "s6"})); - for (int i = 0; i < 10000; ++i) { + for (int i = 0; i < 5; ++i) { mqttHandler.onEvent( new MQTTEvent( "test", @@ -145,19 +148,18 @@ public class MQTTSinkTest { mqttHandler.close(); - await().atMost(1, MINUTES).until(() -> 10000 == checkMultiSensorsHandlerResult()); + TimeUnit.SECONDS.sleep(10); + + assertEquals(5, checkMultiSensorsHandlerResult()); } - private int checkMultiSensorsHandlerResult() throws ClassNotFoundException { + private int checkMultiSensorsHandlerResult() { int count = 0; - Class.forName(Config.JDBC_DRIVER_NAME); - try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { - Assert.assertTrue(statement.execute("select * from root.**")); - try (ResultSet resultSet = statement.getResultSet()) { + try (ResultSet resultSet = statement.executeQuery("select * from root.**")) { + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); checkHeader( diff --git a/server/src/test/java/org/apache/iotdb/db/tools/IoTDBWatermarkTest.java b/integration-test/src/test/java/org/apache/iotdb/db/it/watermark/IoTDBWatermarkIT.java similarity index 53% rename from server/src/test/java/org/apache/iotdb/db/tools/IoTDBWatermarkTest.java rename to integration-test/src/test/java/org/apache/iotdb/db/it/watermark/IoTDBWatermarkIT.java index 30a09168e5..1b54f3cf67 100644 --- a/server/src/test/java/org/apache/iotdb/db/tools/IoTDBWatermarkTest.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/watermark/IoTDBWatermarkIT.java @@ -16,41 +16,40 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.tools; +package org.apache.iotdb.db.it.watermark; -import org.apache.iotdb.commons.conf.IoTDBConstant; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.constant.TestConstant; -import org.apache.iotdb.db.exception.query.LogicalOperatorException; import org.apache.iotdb.db.tools.watermark.WatermarkDetector; -import org.apache.iotdb.db.utils.EnvironmentUtils; -import org.apache.iotdb.jdbc.Config; +import org.apache.iotdb.it.env.ConfigFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.constant.TestConstant; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; import java.io.File; -import java.io.IOException; import java.io.PrintWriter; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; +import static org.apache.iotdb.itbase.constant.TestConstant.BASE_OUTPUT_PATH; import static org.junit.Assert.fail; -/** - * Notice that, all test begins with "IoTDB" is integration test. All test which will start the - * IoTDB server should be defined as integration test. - */ -public class IoTDBWatermarkTest { +@Ignore // TODO add it back when we support watermark in mpp mode +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBWatermarkIT { - private static String filePath1 = - TestConstant.BASE_OUTPUT_PATH.concat("watermarked_query_result.csv"); - private static String filePath2 = - TestConstant.BASE_OUTPUT_PATH.concat("notWatermarked_query_result.csv"); + private static final String filePath1 = BASE_OUTPUT_PATH.concat("watermarked_query_result.csv"); + private static final String filePath2 = + BASE_OUTPUT_PATH.concat(File.separator).concat("notWatermarked_query_result.csv"); private static PrintWriter writer1; private static PrintWriter writer2; private static String secretKey = "ASDFGHJKL"; @@ -58,19 +57,33 @@ public class IoTDBWatermarkTest { private static int embed_row_cycle = 5; private static int embed_lsb_num = 5; + private boolean originEnableWatermark; + + private String originWatermarkSecretKey; + + private String originWatermarkBitString; + + private String originWatermarkMethod; + @Before public void setUp() throws Exception { - IoTDBDescriptor.getInstance().getConfig().setEnableWatermark(true); // default false - IoTDBDescriptor.getInstance().getConfig().setWatermarkSecretKey(secretKey); - IoTDBDescriptor.getInstance().getConfig().setWatermarkBitString(watermarkBitString); - IoTDBDescriptor.getInstance() - .getConfig() + + originEnableWatermark = ConfigFactory.getConfig().isEnableWatermark(); + originWatermarkSecretKey = ConfigFactory.getConfig().getWatermarkSecretKey(); + originWatermarkBitString = ConfigFactory.getConfig().getWatermarkBitString(); + originWatermarkMethod = ConfigFactory.getConfig().getWatermarkMethod(); + + ConfigFactory.getConfig().setEnableWatermark(true); + ConfigFactory.getConfig().setWatermarkSecretKey(secretKey); + ConfigFactory.getConfig().setWatermarkBitString(watermarkBitString); + ConfigFactory.getConfig() .setWatermarkMethod( String.format( "GroupBasedLSBMethod" + "(embed_row_cycle=%d,embed_lsb_num=%d)", embed_row_cycle, embed_lsb_num)); - EnvironmentUtils.envSetUp(); + EnvFactory.getEnv().initBeforeTest(); + insertData(); File file1 = new File(filePath1); @@ -98,14 +111,16 @@ public class IoTDBWatermarkTest { if (file2.exists()) { file2.delete(); } - EnvironmentUtils.cleanEnv(); + EnvFactory.getEnv().cleanAfterTest(); + + ConfigFactory.getConfig().setEnableWatermark(originEnableWatermark); + ConfigFactory.getConfig().setWatermarkSecretKey(originWatermarkSecretKey); + ConfigFactory.getConfig().setWatermarkBitString(originWatermarkBitString); + ConfigFactory.getConfig().setWatermarkMethod(originWatermarkMethod); } - private static void insertData() throws ClassNotFoundException { - Class.forName(Config.JDBC_DRIVER_NAME); - try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + private static void insertData() { + try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { String[] create_sql = @@ -123,7 +138,7 @@ public class IoTDBWatermarkTest { String sql = String.format( "insert into root.vehicle.d0(timestamp,s0,s2) values(%s,%s,%s)", - time, time % 50, time % 50, time % 50); + time, time % 50, time % 50); statement.execute(sql); if (time % 10 == 0) { sql = @@ -139,36 +154,24 @@ public class IoTDBWatermarkTest { } @Test - public void EncodeAndDecodeTest1() - throws IOException, ClassNotFoundException, LogicalOperatorException { + public void EncodeAndDecodeTest1() { // Watermark Embedding - Class.forName(Config.JDBC_DRIVER_NAME); - try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { statement.execute("GRANT WATERMARK_EMBEDDING TO root"); - boolean hasResultSet = statement.execute("SELECT s0,s1,s2 FROM root.vehicle.d0"); - Assert.assertTrue(hasResultSet); - ResultSet resultSet = statement.getResultSet(); - try { + try (ResultSet resultSet = statement.executeQuery("SELECT s0,s1,s2 FROM root.vehicle.d0")) { while (resultSet.next()) { String ans = resultSet.getString(TestConstant.TIMESTAMP_STR) + "," - + resultSet.getString( - TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s0) + + resultSet.getString(TestConstant.d0 + "." + TestConstant.s0) + "," - + resultSet.getString( - TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s1) + + resultSet.getString(TestConstant.d0 + "." + TestConstant.s1) + "," - + resultSet.getString( - TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s2); + + resultSet.getString(TestConstant.d0 + "." + TestConstant.s2); writer1.println(ans); } writer1.close(); - } finally { - resultSet.close(); } } catch (Exception e) { e.printStackTrace(); @@ -178,50 +181,43 @@ public class IoTDBWatermarkTest { // Watermark Detection double alpha = 0.1; int columnIndex = 1; - boolean isWatermarked = - WatermarkDetector.isWatermarked( - filePath1, - secretKey, - watermarkBitString, - embed_row_cycle, - embed_lsb_num, - alpha, - columnIndex, - "int"); - Assert.assertTrue(isWatermarked); + try { + boolean isWatermarked = + WatermarkDetector.isWatermarked( + filePath1, + secretKey, + watermarkBitString, + embed_row_cycle, + embed_lsb_num, + alpha, + columnIndex, + "int"); + Assert.assertTrue(isWatermarked); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test - public void EncodeAndDecodeTest2() - throws IOException, ClassNotFoundException, LogicalOperatorException { + public void EncodeAndDecodeTest2() { // No Watermark Embedding - Class.forName(Config.JDBC_DRIVER_NAME); - try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { statement.execute("REVOKE WATERMARK_EMBEDDING FROM root"); - boolean hasResultSet = statement.execute("SELECT s0,s1,s2 FROM root.vehicle.d0"); - Assert.assertTrue(hasResultSet); - ResultSet resultSet = statement.getResultSet(); - try { + try (ResultSet resultSet = statement.executeQuery("SELECT s0,s1,s2 FROM root.vehicle.d0")) { while (resultSet.next()) { String ans = resultSet.getString(TestConstant.TIMESTAMP_STR) + "," - + resultSet.getString( - TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s0) + + resultSet.getString(TestConstant.d0 + "." + TestConstant.s0) + "," - + resultSet.getString( - TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s1) + + resultSet.getString(TestConstant.d0 + "." + TestConstant.s1) + "," - + resultSet.getString( - TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s2); + + resultSet.getString(TestConstant.d0 + "." + TestConstant.s2); writer2.println(ans); } writer2.close(); - } finally { - resultSet.close(); } } catch (Exception e) { e.printStackTrace(); @@ -231,16 +227,21 @@ public class IoTDBWatermarkTest { // Watermark Detection double alpha = 0.1; int columnIndex = 1; - boolean isWatermarked = - WatermarkDetector.isWatermarked( - filePath2, - secretKey, - watermarkBitString, - embed_row_cycle, - embed_lsb_num, - alpha, - columnIndex, - "int"); - Assert.assertFalse(isWatermarked); + try { + boolean isWatermarked = + WatermarkDetector.isWatermarked( + filePath2, + secretKey, + watermarkBitString, + embed_row_cycle, + embed_lsb_num, + alpha, + columnIndex, + "int"); + Assert.assertFalse(isWatermarked); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 72c006533b..16a2890956 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -2123,7 +2123,7 @@ public class IoTDBConfig { this.watermarkBitString = watermarkBitString; } - String getWatermarkMethod() { + public String getWatermarkMethod() { return this.watermarkMethod; } diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java index 7fb4ee82b9..2a1f544101 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java @@ -53,10 +53,6 @@ public class PublishHandler extends AbstractInterceptHandler { this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter()); } - protected PublishHandler(PayloadFormatter payloadFormat) { - this.payloadFormat = payloadFormat; - } - @Override public String getID() { return "iotdb-mqtt-broker-listener"; diff --git a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java deleted file mode 100644 index f1b1a20b1a..0000000000 --- a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iotdb.db.protocol.mqtt; - -import org.apache.iotdb.db.utils.EnvironmentUtils; -import org.apache.iotdb.jdbc.Config; - -import io.moquette.interception.messages.InterceptConnectMessage; -import io.moquette.interception.messages.InterceptDisconnectMessage; -import io.moquette.interception.messages.InterceptPublishMessage; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.handler.codec.mqtt.MqttConnectMessage; -import io.netty.handler.codec.mqtt.MqttConnectPayload; -import io.netty.handler.codec.mqtt.MqttFixedHeader; -import io.netty.handler.codec.mqtt.MqttMessageType; -import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; -import io.netty.handler.codec.mqtt.MqttQoS; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.Statement; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class PublishHandlerTest { - - @BeforeClass - public static void setUp() throws Exception { - EnvironmentUtils.envSetUp(); - } - - @AfterClass - public static void tearDown() throws Exception { - EnvironmentUtils.cleanEnv(); - } - - @Test - public void onPublish() throws ClassNotFoundException { - PayloadFormatter payloadFormat = PayloadFormatManager.getPayloadFormat("json"); - PublishHandler handler = new PublishHandler(payloadFormat); - String clientId = "clientId"; - - String payload = - "{\n" - + "\"device\":\"root.sg.d1\",\n" - + "\"timestamp\":1586076045524,\n" - + "\"measurements\":[\"s1\"],\n" - + "\"values\":[0.530635]\n" - + "}"; - - ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); - - // connect - MqttConnectPayload mqttConnectPayload = - new MqttConnectPayload( - clientId, - null, - "test".getBytes(StandardCharsets.UTF_8), - "root", - "root".getBytes(StandardCharsets.UTF_8)); - MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(null, null, mqttConnectPayload); - InterceptConnectMessage interceptConnectMessage = - new InterceptConnectMessage(mqttConnectMessage); - handler.onConnect(interceptConnectMessage); - - // publish - MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("root.sg.d1", 1); - MqttFixedHeader fixedHeader = - new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 1); - MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, variableHeader, buf); - InterceptPublishMessage message = new InterceptPublishMessage(publishMessage, clientId, null); - handler.onPublish(message); - - // disconnect - InterceptDisconnectMessage interceptDisconnectMessage = - new InterceptDisconnectMessage(clientId, null); - handler.onDisconnect(interceptDisconnectMessage); - - String[] retArray = new String[] {"1586076045524,0.530635,"}; - - Class.forName(Config.JDBC_DRIVER_NAME); - try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); - Statement statement = connection.createStatement()) { - boolean hasResultSet = statement.execute("select * from root.sg.d1"); - Assert.assertTrue(hasResultSet); - - try (ResultSet resultSet = statement.getResultSet()) { - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); - int cnt = 0; - while (resultSet.next()) { - StringBuilder builder = new StringBuilder(); - for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { - builder.append(resultSet.getString(i)).append(","); - } - assertEquals(retArray[cnt], builder.toString()); - cnt++; - } - assertEquals(retArray.length, cnt); - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} diff --git a/server/src/test/java/org/apache/iotdb/db/sink/LocalIoTDBSinkTest.java b/server/src/test/java/org/apache/iotdb/db/sink/LocalIoTDBSinkTest.java deleted file mode 100644 index 83f2c273d8..0000000000 --- a/server/src/test/java/org/apache/iotdb/db/sink/LocalIoTDBSinkTest.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.sink; - -import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBConfiguration; -import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBEvent; -import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBHandler; -import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.utils.EnvironmentUtils; -import org.apache.iotdb.jdbc.Config; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.Binary; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Types; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class LocalIoTDBSinkTest { - - @Before - public void setUp() throws Exception { - EnvironmentUtils.envSetUp(); - } - - @After - public void tearDown() throws Exception { - EnvironmentUtils.cleanEnv(); - } - - @Test - public void onEventUsingSingleSensorHandler() throws Exception { - LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler(); - localIoTDBHandler.open( - new LocalIoTDBConfiguration( - "root.sg1.d1", new String[] {"s1"}, new TSDataType[] {TSDataType.INT32})); - - for (int i = 0; i < 10000; ++i) { - localIoTDBHandler.onEvent(new LocalIoTDBEvent(i, i)); - } - - localIoTDBHandler.close(); - - Class.forName(Config.JDBC_DRIVER_NAME); - try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); - Statement statement = connection.createStatement()) { - Assert.assertTrue(statement.execute("select * from root.**")); - - try (ResultSet resultSet = statement.getResultSet()) { - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); - - checkHeader( - resultSetMetaData, - "Time,root.sg1.d1.s1,", - new int[] { - Types.TIMESTAMP, Types.INTEGER, - }); - - int count = 0; - while (resultSet.next()) { - for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { - assertEquals(count, Double.parseDouble(resultSet.getString(i)), 0.0); - } - count++; - } - Assert.assertEquals(10000, count); - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void onEventUsingMultiSensorsHandler() throws Exception { - LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler(); - localIoTDBHandler.open( - new LocalIoTDBConfiguration( - "root.sg1.d1", - new String[] {"s1", "s2", "s3", "s4", "s5", "s6"}, - new TSDataType[] { - TSDataType.INT32, - TSDataType.INT64, - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.BOOLEAN, - TSDataType.TEXT - })); - - for (int i = 0; i < 10000; ++i) { - localIoTDBHandler.onEvent( - new LocalIoTDBEvent( - i, - i, - (long) i, - (float) i, - (double) i, - i % 2 == 0, - Binary.valueOf(String.valueOf(i)))); - } - - localIoTDBHandler.close(); - - Class.forName(Config.JDBC_DRIVER_NAME); - try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); - Statement statement = connection.createStatement()) { - Assert.assertTrue(statement.execute("select * from root.**")); - - try (ResultSet resultSet = statement.getResultSet()) { - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); - - checkHeader( - resultSetMetaData, - "Time,root.sg1.d1.s1,root.sg1.d1.s2,root.sg1.d1.s3," - + "root.sg1.d1.s4,root.sg1.d1.s5,root.sg1.d1.s6,", - new int[] { - Types.TIMESTAMP, - Types.INTEGER, - Types.BIGINT, - Types.FLOAT, - Types.DOUBLE, - Types.BOOLEAN, - Types.VARCHAR, - }); - - int count = 0; - while (resultSet.next()) { - for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { - try { - assertEquals(count, Double.parseDouble(resultSet.getString(i)), 0.0); - } catch (NumberFormatException e) { - assertEquals(count % 2 == 0, Boolean.parseBoolean(resultSet.getString(i))); - } - } - count++; - } - Assert.assertEquals(10000, count); - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - private void checkHeader( - ResultSetMetaData resultSetMetaData, String expectedHeaderStrings, int[] expectedTypes) - throws SQLException { - String[] expectedHeaders = expectedHeaderStrings.split(","); - Map<String, Integer> expectedHeaderToTypeIndexMap = new HashMap<>(); - for (int i = 0; i < expectedHeaders.length; ++i) { - expectedHeaderToTypeIndexMap.put(expectedHeaders[i], i); - } - - for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { - Integer typeIndex = expectedHeaderToTypeIndexMap.get(resultSetMetaData.getColumnName(i)); - Assert.assertNotNull(typeIndex); - Assert.assertEquals(expectedTypes[typeIndex], resultSetMetaData.getColumnType(i)); - } - } - - @Test(expected = QueryProcessException.class) - public void onEventWithWrongType1() throws Exception { - LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler(); - localIoTDBHandler.open( - new LocalIoTDBConfiguration( - "root.sg1.d1", new String[] {"s1"}, new TSDataType[] {TSDataType.INT32})); - - localIoTDBHandler.onEvent(new LocalIoTDBEvent(0, Binary.valueOf(String.valueOf(0)))); - - localIoTDBHandler.close(); - } - - @Test(expected = ClassCastException.class) - public void onEventWithWrongType2() throws Exception { - LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler(); - localIoTDBHandler.open( - new LocalIoTDBConfiguration( - "root.sg1.d1", new String[] {"s1"}, new TSDataType[] {TSDataType.TEXT})); - - localIoTDBHandler.onEvent(new LocalIoTDBEvent(0, String.valueOf(0))); - - localIoTDBHandler.close(); - } -}
