This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch pipeline_flush_task
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipeline_flush_task by this
push:
new 0651a79 fix
0651a79 is described below
commit 0651a79d1d23f89b02e96ff3e70be94d5a11a7ee
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jan 26 22:32:15 2021 +0800
fix
---
example/session/src/assembly/session-example.xml | 40 +++++
.../org/apache/iotdb/DataMigrationExample.java | 185 ---------------------
.../java/org/apache/iotdb/SessionPoolExample.java | 119 -------------
3 files changed, 40 insertions(+), 304 deletions(-)
diff --git a/example/session/src/assembly/session-example.xml
b/example/session/src/assembly/session-example.xml
new file mode 100644
index 0000000..4db5225
--- /dev/null
+++ b/example/session/src/assembly/session-example.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly>
+ <id>server</id>
+ <formats>
+ <format>dir</format>
+ <format>zip</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>lib</outputDirectory>
+ </dependencySet>
+ </dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>src/assembly/resources</directory>
+ <outputDirectory>${file.separator}</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git
a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
deleted file mode 100644
index ccb878d..0000000
--- a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.SessionDataSet.DataIterator;
-import org.apache.iotdb.session.pool.SessionDataSetWrapper;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-
-/**
- * Migrate all data belongs to a path from one IoTDB to another IoTDB Each
thread migrate one
- * series, the concurrent thread can be configured by concurrency
- *
- * This example is migrating all timeseries from a local IoTDB with 6667 port
to a local IoTDB with
- * 6668 port
- */
-public class DataMigrationExample {
-
- // used to read data from the source IoTDB
- private static SessionPool readerPool;
- // used to write data into the destination IoTDB
- private static SessionPool writerPool;
- // concurrent thread of loading timeseries data
- private static int concurrency = 5;
-
- public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException,
ExecutionException, InterruptedException {
-
- ExecutorService executorService = Executors.newFixedThreadPool(2 *
concurrency + 1);
-
- String path = "root";
-
- if (args.length != 0) {
- path = args[0];
- }
-
- readerPool = new SessionPool("127.0.0.1", 6667, "root", "root",
concurrency);
- writerPool = new SessionPool("127.0.0.1", 6668, "root", "root",
concurrency);
-
- SessionDataSetWrapper schemaDataSet = readerPool
- .executeQueryStatement("count timeseries " + path);
- DataIterator schemaIter = schemaDataSet.iterator();
- int total;
- if (schemaIter.next()) {
- total = schemaIter.getInt(1);
- System.out.println("Total timeseries: " + total);
- } else {
- System.out.println("Can not get timeseries schema");
- System.exit(1);
- }
- readerPool.closeResultSet(schemaDataSet);
-
- schemaDataSet = readerPool
- .executeQueryStatement("show timeseries " + path);
- schemaIter = schemaDataSet.iterator();
-
- List<Future> futureList = new ArrayList<>();
- int count = 0;
- while (schemaIter.next()) {
- count ++;
- Path currentPath = new Path(schemaIter.getString("timeseries"));
- Future future = executorService.submit(
- new LoadThread(count, currentPath,
TSDataType.valueOf(schemaIter.getString("dataType"))));
- futureList.add(future);
- }
- readerPool.closeResultSet(schemaDataSet);
-
- for (Future future : futureList) {
- future.get();
- }
- executorService.shutdown();
-
- readerPool.close();
- writerPool.close();
- }
-
-
- static class LoadThread implements Callable<Void> {
-
- String device;
- String measurement;
- Path series;
- TSDataType dataType;
- Tablet tablet;
- int i;
-
- public LoadThread(int i, Path series, TSDataType dataType) {
- this.i = i;
- this.device = series.getDevice();
- this.measurement = series.getMeasurement();
- this.dataType = dataType;
- this.series = series;
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema(measurement, dataType));
- tablet = new Tablet(device, schemaList, 300000);
- }
-
- @Override
- public Void call() {
-
- SessionDataSetWrapper dataSet = null;
-
- try {
-
- dataSet = readerPool
- .executeQueryStatement(String.format("select %s from %s",
measurement, device));
-
- DataIterator dataIter = dataSet.iterator();
- while (dataIter.next()) {
- int row = tablet.rowSize++;
- tablet.timestamps[row] = dataIter.getLong(1);
- switch (dataType) {
- case BOOLEAN:
- ((boolean[]) tablet.values[0])[row] = dataIter.getBoolean(2);
- break;
- case INT32:
- ((int[]) tablet.values[0])[row] = dataIter.getInt(2);
- break;
- case INT64:
- ((long[]) tablet.values[0])[row] = dataIter.getLong(2);
- break;
- case FLOAT:
- ((float[]) tablet.values[0])[row] = dataIter.getFloat(2);
- break;
- case DOUBLE:
- ((double[]) tablet.values[0])[row] = dataIter.getDouble(2);
- break;
- case TEXT:
- ((Binary[]) tablet.values[0])[row] = new
Binary(dataIter.getString(2));
- break;
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- writerPool.insertTablet(tablet, true);
- tablet.reset();
- }
- }
- if (tablet.rowSize != 0) {
- writerPool.insertTablet(tablet);
- tablet.reset();
- }
-
- } catch (Exception e) {
- System.out.println(
- "Loading the " + i + "-th timeseries: " + series + " failed " +
e.getMessage());
- return null;
- } finally {
- readerPool.closeResultSet(dataSet);
- }
-
- System.out.println("Loading the " + i + "-th timeseries: " + series + "
success");
- return null;
- }
-
- }
-}
\ No newline at end of file
diff --git
a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
deleted file mode 100644
index 395312e..0000000
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.SessionDataSet.DataIterator;
-import org.apache.iotdb.session.pool.SessionDataSetWrapper;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
-public class SessionPoolExample {
-
- private static SessionPool pool;
- private static ExecutorService service;
-
- public static void main(String[] args)
- throws StatementExecutionException, IoTDBConnectionException,
InterruptedException {
- pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
- service = Executors.newFixedThreadPool(10);
-
- insertRecord();
- queryByRowRecord();
- Thread.sleep(1000);
- queryByIterator();
- pool.close();
- service.shutdown();
- }
-
- // more insert example, see SessionExample.java
- private static void insertRecord() throws StatementExecutionException,
IoTDBConnectionException {
- String deviceId = "root.sg1.d1";
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- measurements.add("s1");
- measurements.add("s2");
- measurements.add("s3");
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
- types.add(TSDataType.INT64);
-
- for (long time = 0; time < 10; time++) {
- List<Object> values = new ArrayList<>();
- values.add(1L);
- values.add(2L);
- values.add(3L);
- pool.insertRecord(deviceId, time, measurements, types, values);
- }
- }
-
- private static void queryByRowRecord() {
- for (int i = 0; i < 1; i++) {
- service.submit(() -> {
- SessionDataSetWrapper wrapper = null;
- try {
- wrapper = pool.executeQueryStatement("select * from root.sg1.d1");
- System.out.println(wrapper.getColumnNames());
- System.out.println(wrapper.getColumnTypes());
- while (wrapper.hasNext()) {
- System.out.println(wrapper.next());
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- e.printStackTrace();
- } finally {
- // remember to close data set finally!
- pool.closeResultSet(wrapper);
- }
- });
- }
- }
-
- private static void queryByIterator() {
- for (int i = 0; i < 1; i++) {
- service.submit(() -> {
- SessionDataSetWrapper wrapper = null;
- try {
- wrapper = pool.executeQueryStatement("select * from root.sg1.d1");
- // get DataIterator like JDBC
- DataIterator dataIterator = wrapper.iterator();
- System.out.println(wrapper.getColumnNames());
- System.out.println(wrapper.getColumnTypes());
- while (dataIterator.next()) {
- StringBuilder builder = new StringBuilder();
- for (String columnName: wrapper.getColumnNames()) {
- builder.append(dataIterator.getString(columnName) + " ");
- }
- System.out.println(builder.toString());
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- e.printStackTrace();
- } finally {
- // remember to close data set finally!
- pool.closeResultSet(wrapper);
- }
- });
- }
- }
-
-}