This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 387c2102600 [IOTDB-5967] Pipe: fix convertToTablet bug and introduce
PipeEmptyTabletInsertionEvent (#10044)
387c2102600 is described below
commit 387c2102600273842a1e8578c79455366581b540
Author: Itami Sho <[email protected]>
AuthorDate: Sun Jun 4 16:53:37 2023 +0800
[IOTDB-5967] Pipe: fix convertToTablet bug and introduce
PipeEmptyTabletInsertionEvent (#10044)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../iotdb/pipe/api/collector/RowCollector.java | 5 +-
.../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 38 +++--
.../event/impl/PipeEmptyTabletInsertionEvent.java | 28 ++--
...ava => PipeInsertNodeTabletInsertionEvent.java} | 31 +++-
...nt.java => PipeTabletTabletInsertionEvent.java} | 6 +-
.../realtime/PipeRealtimeCollectEventFactory.java | 6 +-
.../core/event/realtime/TsFileEpochManager.java | 6 +-
.../db/pipe/core/event/view/access/PipeRow.java | 22 +--
.../event/view/collector/PipeRowCollector.java | 10 +-
.../TabletInsertionDataContainer.java | 106 ++++--------
.../TsFileInsertionDataContainer.java | 4 +-
.../PipeInsertNodeTabletInsertionEventTest.java | 178 +++++++++++++++++++++
12 files changed, 296 insertions(+), 144 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
index bdd96be926b..0518f8b2040 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
@@ -26,9 +26,8 @@ import java.io.IOException;
import java.util.function.BiConsumer;
/**
- * Used to collect rows generated by {@link
TabletInsertionEvent#processRowByRow(BiConsumer)},{@link
- * TabletInsertionEvent#processByIterator(BiConsumer)} or {@link
- * TabletInsertionEvent#processTablet(BiConsumer)}.
+ * Used to collect rows generated by {@link
TabletInsertionEvent#processRowByRow(BiConsumer)},
+ * {@link TabletInsertionEvent#processTablet(BiConsumer)}.
*/
public interface RowCollector {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 40c156e3285..c67e3966cc2 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -33,8 +33,9 @@ import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -117,13 +118,15 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
// PipeProcessor can change the type of TabletInsertionEvent
try {
- if (tabletInsertionEvent instanceof PipeInsertNodeInsertionEvent) {
- doTransfer((PipeInsertNodeInsertionEvent) tabletInsertionEvent);
- } else if (tabletInsertionEvent instanceof PipeTabletInsertionEvent) {
- doTransfer((PipeTabletInsertionEvent) tabletInsertionEvent);
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
+ } else if (tabletInsertionEvent instanceof
PipeTabletTabletInsertionEvent) {
+ doTransfer((PipeTabletTabletInsertionEvent) tabletInsertionEvent);
+ } else if (tabletInsertionEvent instanceof
PipeEmptyTabletInsertionEvent) {
+ doTransfer((PipeEmptyTabletInsertionEvent) tabletInsertionEvent);
} else {
throw new NotImplementedException(
- "IoTDBThriftConnectorV1 only support PipeInsertNodeInsertionEvent
and PipeTabletInsertionEvent.");
+ "IoTDBThriftConnectorV1 only support
PipeInsertNodeTabletInsertionEvent and PipeTabletTabletInsertionEvent.");
}
} catch (TException e) {
LOGGER.error(
@@ -136,35 +139,40 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
}
}
- private void doTransfer(PipeInsertNodeInsertionEvent
pipeInsertNodeInsertionEvent)
+ private void doTransfer(PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
throws PipeException, TException, WALPipeException {
final TPipeTransferResp resp =
client.pipeTransfer(
PipeTransferInsertNodeReq.toTPipeTransferReq(
- pipeInsertNodeInsertionEvent.getInsertNode()));
+ pipeInsertNodeTabletInsertionEvent.getInsertNode()));
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
- "Transfer PipeInsertNodeInsertionEvent %s error, result status
%s",
- pipeInsertNodeInsertionEvent, resp.status));
+ "Transfer PipeInsertNodeTabletInsertionEvent %s error, result
status %s",
+ pipeInsertNodeTabletInsertionEvent, resp.status));
}
}
- private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
+ private void doTransfer(PipeTabletTabletInsertionEvent
pipeTabletTabletInsertionEvent)
throws PipeException, TException, IOException {
final TPipeTransferResp resp =
client.pipeTransfer(
-
PipeTransferTabletReq.toTPipeTransferReq(pipeTabletInsertionEvent.convertToTablet()));
+ PipeTransferTabletReq.toTPipeTransferReq(
+ pipeTabletTabletInsertionEvent.convertToTablet()));
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
- "Transfer PipeTabletInsertionEvent %s error, result status %s",
- pipeTabletInsertionEvent, resp.status));
+ "Transfer PipeTabletTabletInsertionEvent %s error, result status
%s",
+ pipeTabletTabletInsertionEvent, resp.status));
}
}
+ private void doTransfer(PipeEmptyTabletInsertionEvent
pipeEmptyTabletInsertionEvent) {
+ // do nothing
+ }
+
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
// PipeProcessor can change the type of TabletInsertionEvent
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
similarity index 65%
copy from
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
copy to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
index bdd96be926b..855da8fa8b0 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
@@ -17,27 +17,23 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.collector;
+package org.apache.iotdb.db.pipe.core.event.impl;
import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.tsfile.write.record.Tablet;
-import java.io.IOException;
import java.util.function.BiConsumer;
-/**
- * Used to collect rows generated by {@link
TabletInsertionEvent#processRowByRow(BiConsumer)},{@link
- * TabletInsertionEvent#processByIterator(BiConsumer)} or {@link
- * TabletInsertionEvent#processTablet(BiConsumer)}.
- */
-public interface RowCollector {
+public class PipeEmptyTabletInsertionEvent implements TabletInsertionEvent {
+ @Override
+ public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
+ return this;
+ }
- /**
- * Collects a row.
- *
- * @param row Row to be collected
- * @throws IOException if any I/O errors occur
- * @see Row
- */
- void collectRow(Row row) throws IOException;
+ @Override
+ public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
+ return this;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
similarity index 83%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
index 3b980f10b9d..416fea8b734 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer;
@@ -38,21 +39,23 @@ import org.slf4j.LoggerFactory;
import java.util.function.BiConsumer;
-public class PipeInsertNodeInsertionEvent extends EnrichedEvent implements
TabletInsertionEvent {
+public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
+ implements TabletInsertionEvent {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeInsertNodeInsertionEvent.class);
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class);
private final WALEntryHandler walEntryHandler;
private final ProgressIndex progressIndex;
private TabletInsertionDataContainer dataContainer;
- public PipeInsertNodeInsertionEvent(
+ public PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler, ProgressIndex progressIndex) {
this(walEntryHandler, progressIndex, null, null);
}
- private PipeInsertNodeInsertionEvent(
+ private PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler,
ProgressIndex progressIndex,
PipeTaskMeta pipeTaskMeta,
@@ -104,9 +107,10 @@ public class PipeInsertNodeInsertionEvent extends
EnrichedEvent implements Table
}
@Override
- public PipeInsertNodeInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ public PipeInsertNodeTabletInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) {
- return new PipeInsertNodeInsertionEvent(walEntryHandler, progressIndex,
pipeTaskMeta, pattern);
+ return new PipeInsertNodeTabletInsertionEvent(
+ walEntryHandler, progressIndex, pipeTaskMeta, pattern);
}
/////////////////////////// TabletInsertionEvent ///////////////////////////
@@ -149,11 +153,24 @@ public class PipeInsertNodeInsertionEvent extends
EnrichedEvent implements Table
}
}
+ @TestOnly
+ public Tablet convertToTabletForTest(InsertNode insertNode, String pattern) {
+ try {
+ if (dataContainer == null) {
+ dataContainer = new TabletInsertionDataContainer(insertNode, pattern);
+ }
+ return dataContainer.convertToTablet();
+ } catch (Exception e) {
+ LOGGER.error("Process tablet error.", e);
+ throw new PipeException("Process tablet error.", e);
+ }
+ }
+
/////////////////////////// Object ///////////////////////////
@Override
public String toString() {
- return "PipeTabletInsertionEvent{"
+ return "PipeTabletTabletInsertionEvent{"
+ "walEntryHandler="
+ walEntryHandler
+ ", progressIndex="
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
similarity index 92%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
index 399b091264d..014972abd7f 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
@@ -29,18 +29,18 @@ import org.apache.iotdb.tsfile.write.record.Tablet;
import java.util.Objects;
import java.util.function.BiConsumer;
-public class PipeTabletInsertionEvent implements TabletInsertionEvent {
+public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent {
private final Tablet tablet;
private final String pattern;
private TabletInsertionDataContainer dataContainer;
- public PipeTabletInsertionEvent(Tablet tablet) {
+ public PipeTabletTabletInsertionEvent(Tablet tablet) {
this(Objects.requireNonNull(tablet), null);
}
- public PipeTabletInsertionEvent(Tablet tablet, String pattern) {
+ public PipeTabletTabletInsertionEvent(Tablet tablet, String pattern) {
this.tablet = Objects.requireNonNull(tablet);
this.pattern = pattern;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 61d65cd43e8..b2a622f7c05 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
+import
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.wal.utils.WALEntryHandler;
@@ -36,8 +36,8 @@ public class PipeRealtimeCollectEventFactory {
public static PipeRealtimeCollectEvent createCollectEvent(
WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource
resource) {
- return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeInsertionEvent(
- new PipeInsertNodeInsertionEvent(walEntryHandler,
insertNode.getProgressIndex()),
+ return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
+ new PipeInsertNodeTabletInsertionEvent(walEntryHandler,
insertNode.getProgressIndex()),
insertNode,
resource);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
index 3e09b86e74f..5a33649aad2 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
+import
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.slf4j.Logger;
@@ -59,8 +59,8 @@ public class TsFileEpochManager {
event.getPattern());
}
- public PipeRealtimeCollectEvent bindPipeInsertNodeInsertionEvent(
- PipeInsertNodeInsertionEvent event, InsertNode node, TsFileResource
resource) {
+ public PipeRealtimeCollectEvent bindPipeInsertNodeTabletInsertionEvent(
+ PipeInsertNodeTabletInsertionEvent event, InsertNode node,
TsFileResource resource) {
return new PipeRealtimeCollectEvent(
event,
filePath2Epoch.computeIfAbsent(resource.getTsFilePath(),
TsFileEpoch::new),
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
index 043be5ec83f..dfab1e1aa2b 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
@@ -39,7 +39,7 @@ public class PipeRow implements Row {
private final MeasurementSchema[] measurementSchemaList;
private final long[] timestampColumn;
- private final Object[][] valueColumns;
+ private final Object[] valueColumns;
private final TSDataType[] valueColumnTypes;
private final String[] columnNameStringList;
@@ -49,7 +49,7 @@ public class PipeRow implements Row {
String deviceId,
MeasurementSchema[] measurementSchemaList,
long[] timestampColumn,
- Object[][] valueColumns,
+ Object[] valueColumns,
TSDataType[] valueColumnTypes,
String[] columnNameStringList) {
this.rowIndex = rowIndex;
@@ -68,42 +68,42 @@ public class PipeRow implements Row {
@Override
public int getInt(int columnIndex) {
- return (int) valueColumns[columnIndex][rowIndex];
+ return ((int[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public long getLong(int columnIndex) {
- return (long) valueColumns[columnIndex][rowIndex];
+ return ((long[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public float getFloat(int columnIndex) {
- return (float) valueColumns[columnIndex][rowIndex];
+ return ((float[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public double getDouble(int columnIndex) {
- return (double) valueColumns[columnIndex][rowIndex];
+ return ((double[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public boolean getBoolean(int columnIndex) {
- return (boolean) valueColumns[columnIndex][rowIndex];
+ return ((boolean[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public Binary getBinary(int columnIndex) {
- return Binary.valueOf((String) valueColumns[columnIndex][rowIndex]);
+ return ((Binary[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public String getString(int columnIndex) {
- return (String) valueColumns[columnIndex][rowIndex];
+ return ((Binary[]) valueColumns[columnIndex])[rowIndex].getStringValue();
}
@Override
public Object getObject(int columnIndex) {
- return valueColumns[columnIndex][rowIndex];
+ return ((Object[]) valueColumns[columnIndex])[rowIndex];
}
@Override
@@ -113,7 +113,7 @@ public class PipeRow implements Row {
@Override
public boolean isNull(int columnIndex) {
- return valueColumns[columnIndex][rowIndex] == null;
+ return ((Object[]) valueColumns[columnIndex])[rowIndex] == null;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
index 3d39e08c32a..ab0371252f3 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
@@ -19,7 +19,8 @@
package org.apache.iotdb.db.pipe.core.event.view.collector;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -65,7 +66,12 @@ public class PipeRowCollector implements RowCollector {
}
public TabletInsertionEvent toTabletInsertionEvent() {
- PipeTabletInsertionEvent tabletInsertionEvent = new
PipeTabletInsertionEvent(tablet);
+ if (tablet == null) {
+ return new PipeEmptyTabletInsertionEvent();
+ }
+
+ PipeTabletTabletInsertionEvent tabletInsertionEvent =
+ new PipeTabletTabletInsertionEvent(tablet);
this.tablet = null;
return tabletInsertionEvent;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
index f61d4f623ad..16d21f73138 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
import org.apache.iotdb.db.pipe.core.event.view.collector.PipeRowCollector;
import org.apache.iotdb.pipe.api.access.Row;
@@ -51,7 +52,7 @@ public class TabletInsertionDataContainer {
private String[] columnNameStringList;
private long[] timestampColumn;
- private Object[][] valueColumns;
+ private Object[] valueColumns;
private TSDataType[] valueColumnTypes;
private BitMap[] nullValueColumnBitmaps;
private int rowCount;
@@ -97,7 +98,7 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
- this.valueColumns = new Object[filteredColumnSize][1];
+ this.valueColumns = new Object[filteredColumnSize];
this.valueColumnTypes = new TSDataType[filteredColumnSize];
this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
@@ -111,7 +112,7 @@ public class TabletInsertionDataContainer {
final int filteredColumnIndex =
originColumnIndex2FilteredColumnIndexMapperList[i];
this.measurementSchemaList[filteredColumnIndex] =
originMeasurementSchemaList[i];
this.columnNameStringList[filteredColumnIndex] =
originColumnNameStringList[i];
- this.valueColumns[filteredColumnIndex][0] = originValueColumns[i];
+ this.valueColumns[filteredColumnIndex] = originValueColumns[i];
this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1);
}
@@ -139,7 +140,7 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
- this.valueColumns = new Object[filteredColumnSize][];
+ this.valueColumns = new Object[filteredColumnSize];
this.valueColumnTypes = new TSDataType[filteredColumnSize];
this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
@@ -188,7 +189,7 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
- this.valueColumns = new Object[filteredColumnSize][];
+ this.valueColumns = new Object[filteredColumnSize];
this.valueColumnTypes = new TSDataType[filteredColumnSize];
this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
@@ -269,51 +270,53 @@ public class TabletInsertionDataContainer {
originMeasurementList, pattern,
originColumnIndex2FilteredColumnIndexMapperList);
}
- private Object[] convertToColumn(Object originColumn, TSDataType dataType,
BitMap bitMap) {
+ private Object convertToColumn(Object originColumn, TSDataType dataType,
BitMap bitMap) {
switch (dataType) {
case INT32:
final int[] intValues = (int[]) originColumn;
- final Integer[] integerValues = new Integer[intValues.length];
+ final int[] integerValues = new int[intValues.length];
for (int i = 0; i < intValues.length; i++) {
- integerValues[i] = bitMap != null && bitMap.isMarked(i) ? null :
intValues[i];
+ integerValues[i] = bitMap != null && bitMap.isMarked(i) ? 0 :
intValues[i];
}
return integerValues;
case INT64:
final long[] longValues = (long[]) originColumn;
- final Long[] longValues2 = new Long[longValues.length];
+ final long[] longValues2 = new long[longValues.length];
for (int i = 0; i < longValues.length; i++) {
- longValues2[i] = bitMap != null && bitMap.isMarked(i) ? null :
longValues[i];
+ longValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 :
longValues[i];
}
return longValues2;
case FLOAT:
final float[] floatValues = (float[]) originColumn;
- final Float[] floatValues2 = new Float[floatValues.length];
+ final float[] floatValues2 = new float[floatValues.length];
for (int i = 0; i < floatValues.length; i++) {
- floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? null :
floatValues[i];
+ floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 :
floatValues[i];
}
return floatValues2;
case DOUBLE:
final double[] doubleValues = (double[]) originColumn;
- final Double[] doubleValues2 = new Double[doubleValues.length];
+ final double[] doubleValues2 = new double[doubleValues.length];
for (int i = 0; i < doubleValues.length; i++) {
- doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? null :
doubleValues[i];
+ doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 :
doubleValues[i];
}
return doubleValues2;
case BOOLEAN:
final boolean[] booleanValues = (boolean[]) originColumn;
- final Boolean[] booleanValues2 = new Boolean[booleanValues.length];
+ final boolean[] booleanValues2 = new boolean[booleanValues.length];
for (int i = 0; i < booleanValues.length; i++) {
- booleanValues2[i] = bitMap != null && bitMap.isMarked(i) ? null :
booleanValues[i];
+ booleanValues2[i] = (bitMap == null || !bitMap.isMarked(i)) &&
booleanValues[i];
}
return booleanValues2;
case TEXT:
final Binary[] binaryValues = (Binary[]) originColumn;
- final String[] stringValues = new String[binaryValues.length];
+ final Binary[] stringValues = new Binary[binaryValues.length];
for (int i = 0; i < binaryValues.length; i++) {
stringValues[i] =
bitMap != null && bitMap.isMarked(i)
? null
- : (binaryValues[i] == null ? null :
binaryValues[i].getStringValue());
+ : (binaryValues[i] == null
+ ? null
+ : Binary.valueOf(binaryValues[i].getStringValue()));
}
return stringValues;
default:
@@ -326,6 +329,10 @@ public class TabletInsertionDataContainer {
public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
final PipeRowCollector rowCollector = new PipeRowCollector();
+ if (valueColumns.length == 0) {
+ return new PipeEmptyTabletInsertionEvent();
+ }
+
for (int i = 0; i < timestampColumn.length; i++) {
consumer.accept(
new PipeRow(
@@ -355,76 +362,17 @@ public class TabletInsertionDataContainer {
}
final int columnSize = measurementSchemaList.length;
- final int rowSize = valueColumns[0].length;
final List<MeasurementSchema> measurementSchemaArrayList =
new ArrayList<>(Arrays.asList(measurementSchemaList).subList(0,
columnSize));
- final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList,
rowSize);
+ final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList,
rowCount);
newTablet.timestamps = timestampColumn;
newTablet.bitMaps = nullValueColumnBitmaps;
- newTablet.values = squashFromColumnList(valueColumns, valueColumnTypes);
+ newTablet.values = valueColumns;
newTablet.rowSize = rowCount;
tablet = newTablet;
return tablet;
}
-
- private Object[] squashFromColumnList(Object[][] valueColumns, TSDataType[]
valueColumnTypes) {
- final Object[] values = new Object[valueColumns.length];
- for (int i = 0; i < valueColumns.length; i++) {
- values[i] = squashFromColumn(valueColumns[i], valueColumnTypes[i]);
- }
- return values;
- }
-
- private Object squashFromColumn(Object[] valueColumn, TSDataType
valueColumnType) {
- switch (valueColumnType) {
- case INT32:
- final Integer[] intValues = (Integer[]) valueColumn;
- final int[] intValues2 = new int[intValues.length];
- for (int i = 0; i < intValues.length; i++) {
- intValues2[i] = intValues[i] == null ? 0 : intValues[i];
- }
- return intValues2;
- case INT64:
- final Long[] longValues = (Long[]) valueColumn;
- final long[] longValues2 = new long[longValues.length];
- for (int i = 0; i < longValues.length; i++) {
- longValues2[i] = longValues[i] == null ? 0 : longValues[i];
- }
- return longValues2;
- case FLOAT:
- final Float[] floatValues = (Float[]) valueColumn;
- final float[] floatValues2 = new float[floatValues.length];
- for (int i = 0; i < floatValues.length; i++) {
- floatValues2[i] = floatValues[i] == null ? 0 : floatValues[i];
- }
- return floatValues2;
- case DOUBLE:
- final Double[] doubleValues = (Double[]) valueColumn;
- final double[] doubleValues2 = new double[doubleValues.length];
- for (int i = 0; i < doubleValues.length; i++) {
- doubleValues2[i] = doubleValues[i] == null ? 0 : doubleValues[i];
- }
- return doubleValues2;
- case BOOLEAN:
- final Boolean[] booleanValues = (Boolean[]) valueColumn;
- final boolean[] booleanValues2 = new boolean[booleanValues.length];
- for (int i = 0; i < booleanValues.length; i++) {
- booleanValues2[i] = booleanValues[i] != null && booleanValues[i];
- }
- return booleanValues2;
- case TEXT:
- final String[] stringValues = (String[]) valueColumn;
- final Binary[] binaryValues = new Binary[stringValues.length];
- for (int i = 0; i < stringValues.length; i++) {
- binaryValues[i] = stringValues[i] == null ? null :
Binary.valueOf(stringValues[i]);
- }
- return binaryValues;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", valueColumnType));
- }
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
index 260814d609f..9035a8fe0bb 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.core.event.view.datastructure;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -124,7 +124,7 @@ public class TsFileInsertionDataContainer {
@Override
public TabletInsertionEvent next() {
- return new PipeTabletInsertionEvent(tabletIterator.next());
+ return new PipeTabletTabletInsertionEvent(tabletIterator.next());
}
};
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
new file mode 100644
index 00000000000..5a211fc2cff
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class PipeInsertNodeTabletInsertionEventTest {
+
+ InsertRowNode insertRowNode;
+ InsertTabletNode insertTabletNode;
+
+ final String deviceId = "root.sg.d1";
+ final long[] times = new long[] {110L, 111L, 112L, 113L, 114L};
+ final String[] measurementIds = new String[] {"s1", "s2", "s3", "s4", "s5",
"s6"};
+ final TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ final MeasurementSchema[] schemas = new MeasurementSchema[6];
+ final Object[] values = new Object[6];
+
+ final String pattern = "root.sg.d1";
+
+ Tablet tabletForInsertRowNode;
+ Tablet tabletForInsertTabletNode;
+
+ @Before
+ public void setUp() throws Exception {
+ createMeasurementSchema();
+ createInsertRowNode();
+ createInsertTabletNode();
+ createTablet();
+ }
+
+ private void createMeasurementSchema() {
+ for (int i = 0; i < 6; i++) {
+ schemas[i] = new MeasurementSchema(measurementIds[i], dataTypes[i]);
+ }
+ }
+
+ private void createInsertRowNode() throws IllegalPathException {
+
+ insertRowNode =
+ new InsertRowNode(
+ new PlanNodeId("plan node 1"),
+ new PartialPath(deviceId),
+ false,
+ measurementIds,
+ dataTypes,
+ schemas,
+ times[0],
+ values,
+ false);
+ }
+
+ private void createInsertTabletNode() throws IllegalPathException {
+ this.insertTabletNode =
+ new InsertTabletNode(
+ new PlanNodeId("plannode 1"),
+ new PartialPath(deviceId),
+ false,
+ measurementIds,
+ dataTypes,
+ schemas,
+ times,
+ null,
+ values,
+ times.length);
+ }
+
+ private void createTablet() {
+
+ // create tablet for insertRowNode
+ BitMap[] bitMapsForInsertRowNode = new BitMap[6];
+ for (int i = 0; i < 6; i++) {
+ bitMapsForInsertRowNode[i] = new BitMap(1);
+ }
+
+ values[0] = new int[1];
+ values[1] = new long[1];
+ values[2] = new float[1];
+ values[3] = new double[1];
+ values[4] = new boolean[1];
+ values[5] = new Binary[1];
+
+ for (int r = 0; r < 1; r++) {
+ ((int[]) values[0])[r] = 100;
+ ((long[]) values[1])[r] = 10000;
+ ((float[]) values[2])[r] = 2;
+ ((double[]) values[3])[r] = 1.0;
+ ((boolean[]) values[4])[r] = false;
+ ((Binary[]) values[5])[r] = Binary.valueOf("text");
+ }
+
+ tabletForInsertRowNode = new Tablet(deviceId, Arrays.asList(schemas), 1);
+ tabletForInsertRowNode.values = values;
+ tabletForInsertRowNode.timestamps = new long[] {times[0]};
+ tabletForInsertRowNode.rowSize = 1;
+ tabletForInsertRowNode.bitMaps = bitMapsForInsertRowNode;
+
+ // create tablet for insertTabletNode
+ BitMap[] bitMapsForInsertTabletNode = new BitMap[6];
+ for (int i = 0; i < 6; i++) {
+ bitMapsForInsertTabletNode[i] = new BitMap(times.length);
+ }
+
+ values[0] = new int[5];
+ values[1] = new long[5];
+ values[2] = new float[5];
+ values[3] = new double[5];
+ values[4] = new boolean[5];
+ values[5] = new Binary[5];
+
+ for (int r = 0; r < 5; r++) {
+ ((int[]) values[0])[r] = 100;
+ ((long[]) values[1])[r] = 10000;
+ ((float[]) values[2])[r] = 2;
+ ((double[]) values[3])[r] = 1.0;
+ ((boolean[]) values[4])[r] = false;
+ ((Binary[]) values[5])[r] = Binary.valueOf("text");
+ }
+ tabletForInsertTabletNode = new Tablet(deviceId, Arrays.asList(schemas),
times.length);
+ tabletForInsertTabletNode.values = values;
+ tabletForInsertTabletNode.timestamps = times;
+ tabletForInsertTabletNode.rowSize = times.length;
+ tabletForInsertTabletNode.bitMaps = bitMapsForInsertTabletNode;
+ }
+
+ @Test
+ public void convertToTabletForTest() {
+ PipeInsertNodeTabletInsertionEvent event1 = new
PipeInsertNodeTabletInsertionEvent(null, null);
+ Tablet tablet1 = event1.convertToTabletForTest(insertRowNode, pattern);
+ Assert.assertEquals(tablet1, tabletForInsertRowNode);
+
+ PipeInsertNodeTabletInsertionEvent event2 = new
PipeInsertNodeTabletInsertionEvent(null, null);
+ Tablet tablet2 = event2.convertToTabletForTest(insertTabletNode, pattern);
+ Assert.assertEquals(tablet2, tabletForInsertTabletNode);
+ }
+}