This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-tablet-covert
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c449f57bc0010229dd7a214e51600b3e5536c0f0
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jun 5 20:11:30 2023 +0800

    fix: insertRecordNode column convertion bug
---
 .../event/dml/insertion/TabletInsertionEvent.java  |  12 +-
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      |  19 +--
 .../event/impl/PipeEmptyTabletInsertionEvent.java  |  39 -----
 .../impl/PipeInsertNodeTabletInsertionEvent.java   |  16 +-
 ...Event.java => PipeRawTabletInsertionEvent.java} |  20 ++-
 .../db/pipe/core/event/view/access/PipeRow.java    |  12 +-
 .../event/view/collector/PipeRowCollector.java     |  23 +--
 .../TabletInsertionDataContainer.java              | 176 +++++++++------------
 .../TsFileInsertionDataContainer.java              |   4 +-
 .../core/processor/PipeDoNothingProcessor.java     |  14 +-
 10 files changed, 138 insertions(+), 197 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index 4a8073a5a26..09b129a9cc4 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -32,16 +32,16 @@ public interface TabletInsertionEvent extends Event {
   /**
    * The consumer processes the data row by row and collects the results by 
RowCollector.
    *
-   * @return TabletInsertionEvent a new TabletInsertionEvent contains the 
results collected by the
-   *     RowCollector
+   * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent 
contains the results
+   *     collected by the RowCollector
    */
-  TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer);
+  Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> 
consumer);
 
   /**
    * The consumer processes the Tablet directly and collects the results by 
RowCollector.
    *
-   * @return TabletInsertionEvent a new TabletInsertionEvent contains the 
results collected by the
-   *     RowCollector
+   * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent 
contains the results
+   *     collected by the RowCollector
    */
-  TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer);
+  Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index c67e3966cc2..1636d0fdc02 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -33,9 +33,8 @@ import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -120,10 +119,8 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
     try {
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
         doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
-      } else if (tabletInsertionEvent instanceof 
PipeTabletTabletInsertionEvent) {
-        doTransfer((PipeTabletTabletInsertionEvent) tabletInsertionEvent);
-      } else if (tabletInsertionEvent instanceof 
PipeEmptyTabletInsertionEvent) {
-        doTransfer((PipeEmptyTabletInsertionEvent) tabletInsertionEvent);
+      } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+        doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
       } else {
         throw new NotImplementedException(
             "IoTDBThriftConnectorV1 only support 
PipeInsertNodeTabletInsertionEvent and PipeTabletTabletInsertionEvent.");
@@ -154,25 +151,21 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
     }
   }
 
-  private void doTransfer(PipeTabletTabletInsertionEvent 
pipeTabletTabletInsertionEvent)
+  private void doTransfer(PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
       throws PipeException, TException, IOException {
     final TPipeTransferResp resp =
         client.pipeTransfer(
             PipeTransferTabletReq.toTPipeTransferReq(
-                pipeTabletTabletInsertionEvent.convertToTablet()));
+                pipeRawTabletInsertionEvent.convertToTablet()));
 
     if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(
           String.format(
               "Transfer PipeTabletTabletInsertionEvent %s error, result status 
%s",
-              pipeTabletTabletInsertionEvent, resp.status));
+              pipeRawTabletInsertionEvent, resp.status));
     }
   }
 
-  private void doTransfer(PipeEmptyTabletInsertionEvent 
pipeEmptyTabletInsertionEvent) {
-    // do nothing
-  }
-
   @Override
   public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
deleted file mode 100644
index 855da8fa8b0..00000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.impl;
-
-import org.apache.iotdb.pipe.api.access.Row;
-import org.apache.iotdb.pipe.api.collector.RowCollector;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-
-import java.util.function.BiConsumer;
-
-public class PipeEmptyTabletInsertionEvent implements TabletInsertionEvent {
-  @Override
-  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
-    return this;
-  }
-
-  @Override
-  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
-    return this;
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
index 416fea8b734..d9a895acbae 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
@@ -116,7 +116,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   /////////////////////////// TabletInsertionEvent ///////////////////////////
 
   @Override
-  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
+  public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
     try {
       if (dataContainer == null) {
         dataContainer = new TabletInsertionDataContainer(getInsertNode(), 
getPattern());
@@ -129,7 +129,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   }
 
   @Override
-  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
+  public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
     try {
       if (dataContainer == null) {
         dataContainer = new TabletInsertionDataContainer(getInsertNode(), 
getPattern());
@@ -141,18 +141,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     }
   }
 
-  public Tablet convertToTablet() {
-    try {
-      if (dataContainer == null) {
-        dataContainer = new TabletInsertionDataContainer(getInsertNode(), 
getPattern());
-      }
-      return dataContainer.convertToTablet();
-    } catch (Exception e) {
-      LOGGER.error("Process tablet error.", e);
-      throw new PipeException("Process tablet error.", e);
-    }
-  }
-
   @TestOnly
   public Tablet convertToTabletForTest(InsertNode insertNode, String pattern) {
     try {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java
similarity index 74%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java
index 014972abd7f..4343a18aa5d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java
@@ -29,18 +29,18 @@ import org.apache.iotdb.tsfile.write.record.Tablet;
 import java.util.Objects;
 import java.util.function.BiConsumer;
 
-public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent {
+public class PipeRawTabletInsertionEvent implements TabletInsertionEvent {
 
   private final Tablet tablet;
   private final String pattern;
 
   private TabletInsertionDataContainer dataContainer;
 
-  public PipeTabletTabletInsertionEvent(Tablet tablet) {
+  public PipeRawTabletInsertionEvent(Tablet tablet) {
     this(Objects.requireNonNull(tablet), null);
   }
 
-  public PipeTabletTabletInsertionEvent(Tablet tablet, String pattern) {
+  private PipeRawTabletInsertionEvent(Tablet tablet, String pattern) {
     this.tablet = Objects.requireNonNull(tablet);
     this.pattern = pattern;
   }
@@ -52,7 +52,7 @@ public class PipeTabletTabletInsertionEvent implements 
TabletInsertionEvent {
   /////////////////////////// TabletInsertionEvent ///////////////////////////
 
   @Override
-  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
+  public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
     if (dataContainer == null) {
       dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
     }
@@ -60,7 +60,7 @@ public class PipeTabletTabletInsertionEvent implements 
TabletInsertionEvent {
   }
 
   @Override
-  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
+  public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
     if (dataContainer == null) {
       dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
     }
@@ -68,8 +68,16 @@ public class PipeTabletTabletInsertionEvent implements 
TabletInsertionEvent {
   }
 
   public Tablet convertToTablet() {
+    final String pattern = getPattern();
+
+    // if pattern is "root", we don't need to convert, just return the 
original tablet
+    if (pattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) 
{
+      return tablet;
+    }
+
+    // if pattern is not "root", we need to convert the tablet
     if (dataContainer == null) {
-      dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+      dataContainer = new TabletInsertionDataContainer(tablet, pattern);
     }
     return dataContainer.convertToTablet();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
index dfab1e1aa2b..4a8bd65bd30 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.pipe.api.type.Binary;
 import org.apache.iotdb.pipe.api.type.Type;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.Arrays;
@@ -39,8 +40,9 @@ public class PipeRow implements Row {
   private final MeasurementSchema[] measurementSchemaList;
 
   private final long[] timestampColumn;
-  private final Object[] valueColumns;
   private final TSDataType[] valueColumnTypes;
+  private final Object[] valueColumns;
+  private final BitMap[] bitMaps;
 
   private final String[] columnNameStringList;
 
@@ -49,15 +51,17 @@ public class PipeRow implements Row {
       String deviceId,
       MeasurementSchema[] measurementSchemaList,
       long[] timestampColumn,
-      Object[] valueColumns,
       TSDataType[] valueColumnTypes,
+      Object[] valueColumns,
+      BitMap[] bitMaps,
       String[] columnNameStringList) {
     this.rowIndex = rowIndex;
     this.deviceId = deviceId;
     this.measurementSchemaList = measurementSchemaList;
     this.timestampColumn = timestampColumn;
-    this.valueColumns = valueColumns;
     this.valueColumnTypes = valueColumnTypes;
+    this.valueColumns = valueColumns;
+    this.bitMaps = bitMaps;
     this.columnNameStringList = columnNameStringList;
   }
 
@@ -113,7 +117,7 @@ public class PipeRow implements Row {
 
   @Override
   public boolean isNull(int columnIndex) {
-    return ((Object[]) valueColumns[columnIndex])[rowIndex] == null;
+    return bitMaps[columnIndex].isMarked(rowIndex);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
index ab0371252f3..8010b67e57e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
@@ -19,8 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.collector;
 
-import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -35,6 +34,7 @@ import java.util.List;
 
 public class PipeRowCollector implements RowCollector {
 
+  private final List<TabletInsertionEvent> tabletInsertionEventList = new 
ArrayList<>();
   private Tablet tablet = null;
 
   @Override
@@ -63,16 +63,21 @@ public class PipeRowCollector implements RowCollector {
       }
     }
     tablet.rowSize++;
-  }
 
-  public TabletInsertionEvent toTabletInsertionEvent() {
-    if (tablet == null) {
-      return new PipeEmptyTabletInsertionEvent();
+    if (tablet.rowSize == tablet.getMaxRowNumber()) {
+      collectTabletInsertionEvent();
     }
+  }
 
-    PipeTabletTabletInsertionEvent tabletInsertionEvent =
-        new PipeTabletTabletInsertionEvent(tablet);
+  private void collectTabletInsertionEvent() {
+    if (tablet != null) {
+      tabletInsertionEventList.add(new PipeRawTabletInsertionEvent(tablet));
+    }
     this.tablet = null;
-    return tabletInsertionEvent;
+  }
+
+  public Iterable<TabletInsertionEvent> convertToTabletInsertionEvents() {
+    collectTabletInsertionEvent();
+    return tabletInsertionEventList;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
index 16d21f73138..56a6bc49632 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
@@ -23,7 +23,6 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeRowCollector;
 import org.apache.iotdb.pipe.api.access.Row;
@@ -40,6 +39,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.BiConsumer;
@@ -52,8 +52,9 @@ public class TabletInsertionDataContainer {
   private String[] columnNameStringList;
 
   private long[] timestampColumn;
-  private Object[] valueColumns;
   private TSDataType[] valueColumnTypes;
+  // each column of Object[] is a column of primitive type array
+  private Object[] valueColumns;
   private BitMap[] nullValueColumnBitmaps;
   private int rowCount;
 
@@ -89,7 +90,8 @@ public class TabletInsertionDataContainer {
     this.timestampColumn = new long[] {insertRowNode.getTime()};
 
     generateColumnIndexMapper(
-        insertRowNode, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+        insertRowNode.getMeasurements(), pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+
     final int filteredColumnSize =
         Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
             .filter(Objects::nonNull)
@@ -98,22 +100,46 @@ public class TabletInsertionDataContainer {
 
     this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
     this.columnNameStringList = new String[filteredColumnSize];
-    this.valueColumns = new Object[filteredColumnSize];
     this.valueColumnTypes = new TSDataType[filteredColumnSize];
+    this.valueColumns = new Object[filteredColumnSize];
     this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
 
     final MeasurementSchema[] originMeasurementSchemaList = 
insertRowNode.getMeasurementSchemas();
     final String[] originColumnNameStringList = 
insertRowNode.getMeasurements();
-    final Object[] originValueColumns = insertRowNode.getValues();
     final TSDataType[] originValueColumnTypes = insertRowNode.getDataTypes();
+    final Object[] originValueColumns = insertRowNode.getValues();
 
     for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
       if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
         final int filteredColumnIndex = 
originColumnIndex2FilteredColumnIndexMapperList[i];
         this.measurementSchemaList[filteredColumnIndex] = 
originMeasurementSchemaList[i];
         this.columnNameStringList[filteredColumnIndex] = 
originColumnNameStringList[i];
-        this.valueColumns[filteredColumnIndex] = originValueColumns[i];
         this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+        switch (originValueColumnTypes[i]) {
+          case INT32:
+            this.valueColumns[filteredColumnIndex] = new int[] {(Integer) 
originValueColumns[i]};
+            break;
+          case INT64:
+            this.valueColumns[filteredColumnIndex] = new long[] {(Long) 
originValueColumns[i]};
+            break;
+          case FLOAT:
+            this.valueColumns[filteredColumnIndex] = new float[] {(Float) 
originValueColumns[i]};
+            break;
+          case DOUBLE:
+            this.valueColumns[filteredColumnIndex] = new double[] {(Double) 
originValueColumns[i]};
+            break;
+          case BOOLEAN:
+            this.valueColumns[filteredColumnIndex] =
+                new boolean[] {(Boolean) originValueColumns[i]};
+            break;
+          case TEXT:
+            this.valueColumns[filteredColumnIndex] = new Binary[] {(Binary) 
originValueColumns[i]};
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format(
+                    "Data type %s is not supported.", 
originValueColumnTypes[i].toString()));
+        }
         this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1);
       }
     }
@@ -130,7 +156,9 @@ public class TabletInsertionDataContainer {
     this.timestampColumn = insertTabletNode.getTimes();
 
     generateColumnIndexMapper(
-        insertTabletNode, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+        insertTabletNode.getMeasurements(),
+        pattern,
+        originColumnIndex2FilteredColumnIndexMapperList);
 
     final int filteredColumnSize =
         Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
@@ -140,15 +168,15 @@ public class TabletInsertionDataContainer {
 
     this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
     this.columnNameStringList = new String[filteredColumnSize];
-    this.valueColumns = new Object[filteredColumnSize];
     this.valueColumnTypes = new TSDataType[filteredColumnSize];
+    this.valueColumns = new Object[filteredColumnSize];
     this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
 
     final MeasurementSchema[] originMeasurementSchemaList =
         insertTabletNode.getMeasurementSchemas();
     final String[] originColumnNameStringList = 
insertTabletNode.getMeasurements();
-    final Object[] originValueColumns = insertTabletNode.getColumns();
     final TSDataType[] originValueColumnTypes = 
insertTabletNode.getDataTypes();
+    final Object[] originValueColumns = insertTabletNode.getColumns();
     final BitMap[] originBitMapList =
         (insertTabletNode.getBitMaps() == null
             ? IntStream.range(0, originColumnSize)
@@ -156,15 +184,19 @@ public class TabletInsertionDataContainer {
                 .map(o -> new BitMap(timestampColumn.length))
                 .toArray(BitMap[]::new)
             : insertTabletNode.getBitMaps());
+    for (int i = 0; i < originBitMapList.length; i++) {
+      if (originBitMapList[i] == null) {
+        originBitMapList[i] = new BitMap(timestampColumn.length);
+      }
+    }
 
     for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
       if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
         final int filteredColumnIndex = 
originColumnIndex2FilteredColumnIndexMapperList[i];
         this.measurementSchemaList[filteredColumnIndex] = 
originMeasurementSchemaList[i];
         this.columnNameStringList[filteredColumnIndex] = 
originColumnNameStringList[i];
-        this.valueColumns[filteredColumnIndex] =
-            convertToColumn(originValueColumns[i], originValueColumnTypes[i], 
originBitMapList[i]);
         this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+        this.valueColumns[filteredColumnIndex] = originValueColumns[i];
         this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
       }
     }
@@ -179,7 +211,13 @@ public class TabletInsertionDataContainer {
     this.deviceId = tablet.deviceId;
     this.timestampColumn = tablet.timestamps;
 
-    generateColumnIndexMapper(tablet, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+    final List<MeasurementSchema> originMeasurementSchemaList = 
tablet.getSchemas();
+    final String[] originMeasurementList = new 
String[originMeasurementSchemaList.size()];
+    for (int i = 0; i < originMeasurementSchemaList.size(); i++) {
+      originMeasurementList[i] = 
originMeasurementSchemaList.get(i).getMeasurementId();
+    }
+    generateColumnIndexMapper(
+        originMeasurementList, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
 
     final int filteredColumnSize =
         Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
@@ -189,11 +227,10 @@ public class TabletInsertionDataContainer {
 
     this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
     this.columnNameStringList = new String[filteredColumnSize];
-    this.valueColumns = new Object[filteredColumnSize];
     this.valueColumnTypes = new TSDataType[filteredColumnSize];
+    this.valueColumns = new Object[filteredColumnSize];
     this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
 
-    final List<MeasurementSchema> originMeasurementSchemaList = 
tablet.getSchemas();
     final String[] originColumnNameStringList = new String[originColumnSize];
     final TSDataType[] originValueColumnTypes = new 
TSDataType[originColumnSize];
     for (int i = 0; i < originColumnSize; i++) {
@@ -201,16 +238,26 @@ public class TabletInsertionDataContainer {
       originValueColumnTypes[i] = originMeasurementSchemaList.get(i).getType();
     }
     final Object[] originValueColumns = tablet.values;
-    final BitMap[] originBitMapList = tablet.bitMaps;
+    final BitMap[] originBitMapList =
+        tablet.bitMaps == null
+            ? IntStream.range(0, originColumnSize)
+                .boxed()
+                .map(o -> new BitMap(timestampColumn.length))
+                .toArray(BitMap[]::new)
+            : tablet.bitMaps;
+    for (int i = 0; i < originBitMapList.length; i++) {
+      if (originBitMapList[i] == null) {
+        originBitMapList[i] = new BitMap(timestampColumn.length);
+      }
+    }
 
     for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
       if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
         final int filteredColumnIndex = 
originColumnIndex2FilteredColumnIndexMapperList[i];
         this.measurementSchemaList[filteredColumnIndex] = 
originMeasurementSchemaList.get(i);
         this.columnNameStringList[filteredColumnIndex] = 
originColumnNameStringList[i];
-        this.valueColumns[filteredColumnIndex] =
-            convertToColumn(originValueColumns[i], originValueColumnTypes[i], 
originBitMapList[i]);
         this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+        this.valueColumns[filteredColumnIndex] = originValueColumns[i];
         this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
       }
     }
@@ -218,6 +265,7 @@ public class TabletInsertionDataContainer {
     rowCount = tablet.rowSize;
   }
 
+  // TODO: cache the result keyed by deviceId to improve performance
   private void generateColumnIndexMapper(
       String[] originMeasurementList,
       String pattern,
@@ -251,88 +299,14 @@ public class TabletInsertionDataContainer {
     }
   }
 
-  private void generateColumnIndexMapper(
-      InsertNode insertNode,
-      String pattern,
-      Integer[] originColumnIndex2FilteredColumnIndexMapperList) {
-    generateColumnIndexMapper(
-        insertNode.getMeasurements(), pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
-  }
-
-  private void generateColumnIndexMapper(
-      Tablet tablet, String pattern, Integer[] 
originColumnIndex2FilteredColumnIndexMapperList) {
-    final List<MeasurementSchema> originMeasurementSchemaList = 
tablet.getSchemas();
-    final String[] originMeasurementList = new 
String[originMeasurementSchemaList.size()];
-    for (int i = 0; i < originMeasurementSchemaList.size(); i++) {
-      originMeasurementList[i] = 
originMeasurementSchemaList.get(i).getMeasurementId();
-    }
-    generateColumnIndexMapper(
-        originMeasurementList, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
-  }
-
-  private Object convertToColumn(Object originColumn, TSDataType dataType, 
BitMap bitMap) {
-    switch (dataType) {
-      case INT32:
-        final int[] intValues = (int[]) originColumn;
-        final int[] integerValues = new int[intValues.length];
-        for (int i = 0; i < intValues.length; i++) {
-          integerValues[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
intValues[i];
-        }
-        return integerValues;
-      case INT64:
-        final long[] longValues = (long[]) originColumn;
-        final long[] longValues2 = new long[longValues.length];
-        for (int i = 0; i < longValues.length; i++) {
-          longValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
longValues[i];
-        }
-        return longValues2;
-      case FLOAT:
-        final float[] floatValues = (float[]) originColumn;
-        final float[] floatValues2 = new float[floatValues.length];
-        for (int i = 0; i < floatValues.length; i++) {
-          floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
floatValues[i];
-        }
-        return floatValues2;
-      case DOUBLE:
-        final double[] doubleValues = (double[]) originColumn;
-        final double[] doubleValues2 = new double[doubleValues.length];
-        for (int i = 0; i < doubleValues.length; i++) {
-          doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
doubleValues[i];
-        }
-        return doubleValues2;
-      case BOOLEAN:
-        final boolean[] booleanValues = (boolean[]) originColumn;
-        final boolean[] booleanValues2 = new boolean[booleanValues.length];
-        for (int i = 0; i < booleanValues.length; i++) {
-          booleanValues2[i] = (bitMap == null || !bitMap.isMarked(i)) && 
booleanValues[i];
-        }
-        return booleanValues2;
-      case TEXT:
-        final Binary[] binaryValues = (Binary[]) originColumn;
-        final Binary[] stringValues = new Binary[binaryValues.length];
-        for (int i = 0; i < binaryValues.length; i++) {
-          stringValues[i] =
-              bitMap != null && bitMap.isMarked(i)
-                  ? null
-                  : (binaryValues[i] == null
-                      ? null
-                      : Binary.valueOf(binaryValues[i].getStringValue()));
-        }
-        return stringValues;
-      default:
-        throw new UnSupportedDataTypeException(
-            String.format("Data type %s is not supported.", dataType));
-    }
-  }
-
   ////////////////////////////  process  ////////////////////////////
 
-  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
-    final PipeRowCollector rowCollector = new PipeRowCollector();
-    if (valueColumns.length == 0) {
-      return new PipeEmptyTabletInsertionEvent();
+  public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
+    if (valueColumns.length == 0 || timestampColumn.length == 0) {
+      return Collections.emptyList();
     }
 
+    final PipeRowCollector rowCollector = new PipeRowCollector();
     for (int i = 0; i < timestampColumn.length; i++) {
       consumer.accept(
           new PipeRow(
@@ -340,18 +314,19 @@ public class TabletInsertionDataContainer {
               deviceId,
               measurementSchemaList,
               timestampColumn,
-              valueColumns,
               valueColumnTypes,
+              valueColumns,
+              nullValueColumnBitmaps,
               columnNameStringList),
           rowCollector);
     }
-    return rowCollector.toTabletInsertionEvent();
+    return rowCollector.convertToTabletInsertionEvents();
   }
 
-  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
+  public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
     final PipeRowCollector rowCollector = new PipeRowCollector();
     consumer.accept(convertToTablet(), rowCollector);
-    return rowCollector.toTabletInsertionEvent();
+    return rowCollector.convertToTabletInsertionEvents();
   }
 
   ////////////////////////////  convert  ////////////////////////////
@@ -362,10 +337,8 @@ public class TabletInsertionDataContainer {
     }
 
     final int columnSize = measurementSchemaList.length;
-
     final List<MeasurementSchema> measurementSchemaArrayList =
         new ArrayList<>(Arrays.asList(measurementSchemaList).subList(0, 
columnSize));
-
     final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList, 
rowCount);
     newTablet.timestamps = timestampColumn;
     newTablet.bitMaps = nullValueColumnBitmaps;
@@ -373,6 +346,7 @@ public class TabletInsertionDataContainer {
     newTablet.rowSize = rowCount;
 
     tablet = newTablet;
+
     return tablet;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
index 9035a8fe0bb..fe804b31412 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.datastructure;
 
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -124,7 +124,7 @@ public class TsFileInsertionDataContainer {
 
           @Override
           public TabletInsertionEvent next() {
-            return new PipeTabletTabletInsertionEvent(tabletIterator.next());
+            return new PipeRawTabletInsertionEvent(tabletIterator.next());
           }
         };
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
index 62979b7d52c..c9774b4618a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
@@ -56,15 +56,23 @@ public class PipeDoNothingProcessor implements 
PipeProcessor {
           .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
         eventCollector.collect(tabletInsertionEvent);
       } else {
-        eventCollector.collect(
-            tabletInsertionEvent.processRowByRow(
+        tabletInsertionEvent
+            .processRowByRow(
                 (row, rowCollector) -> {
                   try {
                     rowCollector.collectRow(row);
                   } catch (IOException e) {
                     throw new PipeException("Failed to collect row", e);
                   }
-                }));
+                })
+            .forEach(
+                event -> {
+                  try {
+                    eventCollector.collect(event);
+                  } catch (IOException e) {
+                    throw new PipeException("Failed to collect event", e);
+                  }
+                });
       }
     } else {
       eventCollector.collect(tabletInsertionEvent);


Reply via email to