This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 387c2102600 [IOTDB-5967] Pipe: fix convertToTablet bug and introduce 
PipeEmptyTabletInsertionEvent (#10044)
387c2102600 is described below

commit 387c2102600273842a1e8578c79455366581b540
Author: Itami Sho <[email protected]>
AuthorDate: Sun Jun 4 16:53:37 2023 +0800

    [IOTDB-5967] Pipe: fix convertToTablet bug and introduce 
PipeEmptyTabletInsertionEvent (#10044)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../iotdb/pipe/api/collector/RowCollector.java     |   5 +-
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      |  38 +++--
 .../event/impl/PipeEmptyTabletInsertionEvent.java  |  28 ++--
 ...ava => PipeInsertNodeTabletInsertionEvent.java} |  31 +++-
 ...nt.java => PipeTabletTabletInsertionEvent.java} |   6 +-
 .../realtime/PipeRealtimeCollectEventFactory.java  |   6 +-
 .../core/event/realtime/TsFileEpochManager.java    |   6 +-
 .../db/pipe/core/event/view/access/PipeRow.java    |  22 +--
 .../event/view/collector/PipeRowCollector.java     |  10 +-
 .../TabletInsertionDataContainer.java              | 106 ++++--------
 .../TsFileInsertionDataContainer.java              |   4 +-
 .../PipeInsertNodeTabletInsertionEventTest.java    | 178 +++++++++++++++++++++
 12 files changed, 296 insertions(+), 144 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
index bdd96be926b..0518f8b2040 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
@@ -26,9 +26,8 @@ import java.io.IOException;
 import java.util.function.BiConsumer;
 
 /**
- * Used to collect rows generated by {@link 
TabletInsertionEvent#processRowByRow(BiConsumer)},{@link
- * TabletInsertionEvent#processByIterator(BiConsumer)} or {@link
- * TabletInsertionEvent#processTablet(BiConsumer)}.
+ * Used to collect rows generated by {@link 
TabletInsertionEvent#processRowByRow(BiConsumer)},
+ * {@link TabletInsertionEvent#processTablet(BiConsumer)}.
  */
 public interface RowCollector {
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 40c156e3285..c67e3966cc2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -33,8 +33,9 @@ import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -117,13 +118,15 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
   public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
     try {
-      if (tabletInsertionEvent instanceof PipeInsertNodeInsertionEvent) {
-        doTransfer((PipeInsertNodeInsertionEvent) tabletInsertionEvent);
-      } else if (tabletInsertionEvent instanceof PipeTabletInsertionEvent) {
-        doTransfer((PipeTabletInsertionEvent) tabletInsertionEvent);
+      if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+        doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
+      } else if (tabletInsertionEvent instanceof 
PipeTabletTabletInsertionEvent) {
+        doTransfer((PipeTabletTabletInsertionEvent) tabletInsertionEvent);
+      } else if (tabletInsertionEvent instanceof 
PipeEmptyTabletInsertionEvent) {
+        doTransfer((PipeEmptyTabletInsertionEvent) tabletInsertionEvent);
       } else {
         throw new NotImplementedException(
-            "IoTDBThriftConnectorV1 only support PipeInsertNodeInsertionEvent 
and PipeTabletInsertionEvent.");
+            "IoTDBThriftConnectorV1 only support 
PipeInsertNodeTabletInsertionEvent and PipeTabletTabletInsertionEvent.");
       }
     } catch (TException e) {
       LOGGER.error(
@@ -136,35 +139,40 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
     }
   }
 
-  private void doTransfer(PipeInsertNodeInsertionEvent 
pipeInsertNodeInsertionEvent)
+  private void doTransfer(PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
       throws PipeException, TException, WALPipeException {
     final TPipeTransferResp resp =
         client.pipeTransfer(
             PipeTransferInsertNodeReq.toTPipeTransferReq(
-                pipeInsertNodeInsertionEvent.getInsertNode()));
+                pipeInsertNodeTabletInsertionEvent.getInsertNode()));
 
     if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(
           String.format(
-              "Transfer PipeInsertNodeInsertionEvent %s error, result status 
%s",
-              pipeInsertNodeInsertionEvent, resp.status));
+              "Transfer PipeInsertNodeTabletInsertionEvent %s error, result 
status %s",
+              pipeInsertNodeTabletInsertionEvent, resp.status));
     }
   }
 
-  private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
+  private void doTransfer(PipeTabletTabletInsertionEvent 
pipeTabletTabletInsertionEvent)
       throws PipeException, TException, IOException {
     final TPipeTransferResp resp =
         client.pipeTransfer(
-            
PipeTransferTabletReq.toTPipeTransferReq(pipeTabletInsertionEvent.convertToTablet()));
+            PipeTransferTabletReq.toTPipeTransferReq(
+                pipeTabletTabletInsertionEvent.convertToTablet()));
 
     if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(
           String.format(
-              "Transfer PipeTabletInsertionEvent %s error, result status %s",
-              pipeTabletInsertionEvent, resp.status));
+              "Transfer PipeTabletTabletInsertionEvent %s error, result status 
%s",
+              pipeTabletTabletInsertionEvent, resp.status));
     }
   }
 
+  private void doTransfer(PipeEmptyTabletInsertionEvent 
pipeEmptyTabletInsertionEvent) {
+    // do nothing
+  }
+
   @Override
   public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
similarity index 65%
copy from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
copy to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
index bdd96be926b..855da8fa8b0 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
@@ -17,27 +17,23 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.collector;
+package org.apache.iotdb.db.pipe.core.event.impl;
 
 import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.tsfile.write.record.Tablet;
 
-import java.io.IOException;
 import java.util.function.BiConsumer;
 
-/**
- * Used to collect rows generated by {@link 
TabletInsertionEvent#processRowByRow(BiConsumer)},{@link
- * TabletInsertionEvent#processByIterator(BiConsumer)} or {@link
- * TabletInsertionEvent#processTablet(BiConsumer)}.
- */
-public interface RowCollector {
+public class PipeEmptyTabletInsertionEvent implements TabletInsertionEvent {
+  @Override
+  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
+    return this;
+  }
 
-  /**
-   * Collects a row.
-   *
-   * @param row Row to be collected
-   * @throws IOException if any I/O errors occur
-   * @see Row
-   */
-  void collectRow(Row row) throws IOException;
+  @Override
+  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
+    return this;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
similarity index 83%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
index 3b980f10b9d..416fea8b734 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.impl;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer;
@@ -38,21 +39,23 @@ import org.slf4j.LoggerFactory;
 
 import java.util.function.BiConsumer;
 
-public class PipeInsertNodeInsertionEvent extends EnrichedEvent implements 
TabletInsertionEvent {
+public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
+    implements TabletInsertionEvent {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeInsertNodeInsertionEvent.class);
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class);
 
   private final WALEntryHandler walEntryHandler;
   private final ProgressIndex progressIndex;
 
   private TabletInsertionDataContainer dataContainer;
 
-  public PipeInsertNodeInsertionEvent(
+  public PipeInsertNodeTabletInsertionEvent(
       WALEntryHandler walEntryHandler, ProgressIndex progressIndex) {
     this(walEntryHandler, progressIndex, null, null);
   }
 
-  private PipeInsertNodeInsertionEvent(
+  private PipeInsertNodeTabletInsertionEvent(
       WALEntryHandler walEntryHandler,
       ProgressIndex progressIndex,
       PipeTaskMeta pipeTaskMeta,
@@ -104,9 +107,10 @@ public class PipeInsertNodeInsertionEvent extends 
EnrichedEvent implements Table
   }
 
   @Override
-  public PipeInsertNodeInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+  public PipeInsertNodeTabletInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta, String pattern) {
-    return new PipeInsertNodeInsertionEvent(walEntryHandler, progressIndex, 
pipeTaskMeta, pattern);
+    return new PipeInsertNodeTabletInsertionEvent(
+        walEntryHandler, progressIndex, pipeTaskMeta, pattern);
   }
 
   /////////////////////////// TabletInsertionEvent ///////////////////////////
@@ -149,11 +153,24 @@ public class PipeInsertNodeInsertionEvent extends 
EnrichedEvent implements Table
     }
   }
 
+  @TestOnly
+  public Tablet convertToTabletForTest(InsertNode insertNode, String pattern) {
+    try {
+      if (dataContainer == null) {
+        dataContainer = new TabletInsertionDataContainer(insertNode, pattern);
+      }
+      return dataContainer.convertToTablet();
+    } catch (Exception e) {
+      LOGGER.error("Process tablet error.", e);
+      throw new PipeException("Process tablet error.", e);
+    }
+  }
+
   /////////////////////////// Object ///////////////////////////
 
   @Override
   public String toString() {
-    return "PipeTabletInsertionEvent{"
+    return "PipeTabletTabletInsertionEvent{"
         + "walEntryHandler="
         + walEntryHandler
         + ", progressIndex="
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
similarity index 92%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
index 399b091264d..014972abd7f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
@@ -29,18 +29,18 @@ import org.apache.iotdb.tsfile.write.record.Tablet;
 import java.util.Objects;
 import java.util.function.BiConsumer;
 
-public class PipeTabletInsertionEvent implements TabletInsertionEvent {
+public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent {
 
   private final Tablet tablet;
   private final String pattern;
 
   private TabletInsertionDataContainer dataContainer;
 
-  public PipeTabletInsertionEvent(Tablet tablet) {
+  public PipeTabletTabletInsertionEvent(Tablet tablet) {
     this(Objects.requireNonNull(tablet), null);
   }
 
-  public PipeTabletInsertionEvent(Tablet tablet, String pattern) {
+  public PipeTabletTabletInsertionEvent(Tablet tablet, String pattern) {
     this.tablet = Objects.requireNonNull(tablet);
     this.pattern = pattern;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 61d65cd43e8..b2a622f7c05 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
+import 
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 
@@ -36,8 +36,8 @@ public class PipeRealtimeCollectEventFactory {
 
   public static PipeRealtimeCollectEvent createCollectEvent(
       WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource 
resource) {
-    return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeInsertionEvent(
-        new PipeInsertNodeInsertionEvent(walEntryHandler, 
insertNode.getProgressIndex()),
+    return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
+        new PipeInsertNodeTabletInsertionEvent(walEntryHandler, 
insertNode.getProgressIndex()),
         insertNode,
         resource);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
index 3e09b86e74f..5a33649aad2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
+import 
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 
 import org.slf4j.Logger;
@@ -59,8 +59,8 @@ public class TsFileEpochManager {
         event.getPattern());
   }
 
-  public PipeRealtimeCollectEvent bindPipeInsertNodeInsertionEvent(
-      PipeInsertNodeInsertionEvent event, InsertNode node, TsFileResource 
resource) {
+  public PipeRealtimeCollectEvent bindPipeInsertNodeTabletInsertionEvent(
+      PipeInsertNodeTabletInsertionEvent event, InsertNode node, 
TsFileResource resource) {
     return new PipeRealtimeCollectEvent(
         event,
         filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), 
TsFileEpoch::new),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
index 043be5ec83f..dfab1e1aa2b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
@@ -39,7 +39,7 @@ public class PipeRow implements Row {
   private final MeasurementSchema[] measurementSchemaList;
 
   private final long[] timestampColumn;
-  private final Object[][] valueColumns;
+  private final Object[] valueColumns;
   private final TSDataType[] valueColumnTypes;
 
   private final String[] columnNameStringList;
@@ -49,7 +49,7 @@ public class PipeRow implements Row {
       String deviceId,
       MeasurementSchema[] measurementSchemaList,
       long[] timestampColumn,
-      Object[][] valueColumns,
+      Object[] valueColumns,
       TSDataType[] valueColumnTypes,
       String[] columnNameStringList) {
     this.rowIndex = rowIndex;
@@ -68,42 +68,42 @@ public class PipeRow implements Row {
 
   @Override
   public int getInt(int columnIndex) {
-    return (int) valueColumns[columnIndex][rowIndex];
+    return ((int[]) valueColumns[columnIndex])[rowIndex];
   }
 
   @Override
   public long getLong(int columnIndex) {
-    return (long) valueColumns[columnIndex][rowIndex];
+    return ((long[]) valueColumns[columnIndex])[rowIndex];
   }
 
   @Override
   public float getFloat(int columnIndex) {
-    return (float) valueColumns[columnIndex][rowIndex];
+    return ((float[]) valueColumns[columnIndex])[rowIndex];
   }
 
   @Override
   public double getDouble(int columnIndex) {
-    return (double) valueColumns[columnIndex][rowIndex];
+    return ((double[]) valueColumns[columnIndex])[rowIndex];
   }
 
   @Override
   public boolean getBoolean(int columnIndex) {
-    return (boolean) valueColumns[columnIndex][rowIndex];
+    return ((boolean[]) valueColumns[columnIndex])[rowIndex];
   }
 
   @Override
   public Binary getBinary(int columnIndex) {
-    return Binary.valueOf((String) valueColumns[columnIndex][rowIndex]);
+    return ((Binary[]) valueColumns[columnIndex])[rowIndex];
   }
 
   @Override
   public String getString(int columnIndex) {
-    return (String) valueColumns[columnIndex][rowIndex];
+    return ((Binary[]) valueColumns[columnIndex])[rowIndex].getStringValue();
   }
 
   @Override
   public Object getObject(int columnIndex) {
-    return valueColumns[columnIndex][rowIndex];
+    return ((Object[]) valueColumns[columnIndex])[rowIndex];
   }
 
   @Override
@@ -113,7 +113,7 @@ public class PipeRow implements Row {
 
   @Override
   public boolean isNull(int columnIndex) {
-    return valueColumns[columnIndex][rowIndex] == null;
+    return ((Object[]) valueColumns[columnIndex])[rowIndex] == null;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
index 3d39e08c32a..ab0371252f3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
@@ -19,7 +19,8 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.collector;
 
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -65,7 +66,12 @@ public class PipeRowCollector implements RowCollector {
   }
 
   public TabletInsertionEvent toTabletInsertionEvent() {
-    PipeTabletInsertionEvent tabletInsertionEvent = new 
PipeTabletInsertionEvent(tablet);
+    if (tablet == null) {
+      return new PipeEmptyTabletInsertionEvent();
+    }
+
+    PipeTabletTabletInsertionEvent tabletInsertionEvent =
+        new PipeTabletTabletInsertionEvent(tablet);
     this.tablet = null;
     return tabletInsertionEvent;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
index f61d4f623ad..16d21f73138 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeRowCollector;
 import org.apache.iotdb.pipe.api.access.Row;
@@ -51,7 +52,7 @@ public class TabletInsertionDataContainer {
   private String[] columnNameStringList;
 
   private long[] timestampColumn;
-  private Object[][] valueColumns;
+  private Object[] valueColumns;
   private TSDataType[] valueColumnTypes;
   private BitMap[] nullValueColumnBitmaps;
   private int rowCount;
@@ -97,7 +98,7 @@ public class TabletInsertionDataContainer {
 
     this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
     this.columnNameStringList = new String[filteredColumnSize];
-    this.valueColumns = new Object[filteredColumnSize][1];
+    this.valueColumns = new Object[filteredColumnSize];
     this.valueColumnTypes = new TSDataType[filteredColumnSize];
     this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
 
@@ -111,7 +112,7 @@ public class TabletInsertionDataContainer {
         final int filteredColumnIndex = 
originColumnIndex2FilteredColumnIndexMapperList[i];
         this.measurementSchemaList[filteredColumnIndex] = 
originMeasurementSchemaList[i];
         this.columnNameStringList[filteredColumnIndex] = 
originColumnNameStringList[i];
-        this.valueColumns[filteredColumnIndex][0] = originValueColumns[i];
+        this.valueColumns[filteredColumnIndex] = originValueColumns[i];
         this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
         this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1);
       }
@@ -139,7 +140,7 @@ public class TabletInsertionDataContainer {
 
     this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
     this.columnNameStringList = new String[filteredColumnSize];
-    this.valueColumns = new Object[filteredColumnSize][];
+    this.valueColumns = new Object[filteredColumnSize];
     this.valueColumnTypes = new TSDataType[filteredColumnSize];
     this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
 
@@ -188,7 +189,7 @@ public class TabletInsertionDataContainer {
 
     this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
     this.columnNameStringList = new String[filteredColumnSize];
-    this.valueColumns = new Object[filteredColumnSize][];
+    this.valueColumns = new Object[filteredColumnSize];
     this.valueColumnTypes = new TSDataType[filteredColumnSize];
     this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
 
@@ -269,51 +270,53 @@ public class TabletInsertionDataContainer {
         originMeasurementList, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
   }
 
-  private Object[] convertToColumn(Object originColumn, TSDataType dataType, 
BitMap bitMap) {
+  private Object convertToColumn(Object originColumn, TSDataType dataType, 
BitMap bitMap) {
     switch (dataType) {
       case INT32:
         final int[] intValues = (int[]) originColumn;
-        final Integer[] integerValues = new Integer[intValues.length];
+        final int[] integerValues = new int[intValues.length];
         for (int i = 0; i < intValues.length; i++) {
-          integerValues[i] = bitMap != null && bitMap.isMarked(i) ? null : 
intValues[i];
+          integerValues[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
intValues[i];
         }
         return integerValues;
       case INT64:
         final long[] longValues = (long[]) originColumn;
-        final Long[] longValues2 = new Long[longValues.length];
+        final long[] longValues2 = new long[longValues.length];
         for (int i = 0; i < longValues.length; i++) {
-          longValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : 
longValues[i];
+          longValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
longValues[i];
         }
         return longValues2;
       case FLOAT:
         final float[] floatValues = (float[]) originColumn;
-        final Float[] floatValues2 = new Float[floatValues.length];
+        final float[] floatValues2 = new float[floatValues.length];
         for (int i = 0; i < floatValues.length; i++) {
-          floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : 
floatValues[i];
+          floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
floatValues[i];
         }
         return floatValues2;
       case DOUBLE:
         final double[] doubleValues = (double[]) originColumn;
-        final Double[] doubleValues2 = new Double[doubleValues.length];
+        final double[] doubleValues2 = new double[doubleValues.length];
         for (int i = 0; i < doubleValues.length; i++) {
-          doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : 
doubleValues[i];
+          doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
doubleValues[i];
         }
         return doubleValues2;
       case BOOLEAN:
         final boolean[] booleanValues = (boolean[]) originColumn;
-        final Boolean[] booleanValues2 = new Boolean[booleanValues.length];
+        final boolean[] booleanValues2 = new boolean[booleanValues.length];
         for (int i = 0; i < booleanValues.length; i++) {
-          booleanValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : 
booleanValues[i];
+          booleanValues2[i] = (bitMap == null || !bitMap.isMarked(i)) && 
booleanValues[i];
         }
         return booleanValues2;
       case TEXT:
         final Binary[] binaryValues = (Binary[]) originColumn;
-        final String[] stringValues = new String[binaryValues.length];
+        final Binary[] stringValues = new Binary[binaryValues.length];
         for (int i = 0; i < binaryValues.length; i++) {
           stringValues[i] =
               bitMap != null && bitMap.isMarked(i)
                   ? null
-                  : (binaryValues[i] == null ? null : 
binaryValues[i].getStringValue());
+                  : (binaryValues[i] == null
+                      ? null
+                      : Binary.valueOf(binaryValues[i].getStringValue()));
         }
         return stringValues;
       default:
@@ -326,6 +329,10 @@ public class TabletInsertionDataContainer {
 
   public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
     final PipeRowCollector rowCollector = new PipeRowCollector();
+    if (valueColumns.length == 0) {
+      return new PipeEmptyTabletInsertionEvent();
+    }
+
     for (int i = 0; i < timestampColumn.length; i++) {
       consumer.accept(
           new PipeRow(
@@ -355,76 +362,17 @@ public class TabletInsertionDataContainer {
     }
 
     final int columnSize = measurementSchemaList.length;
-    final int rowSize = valueColumns[0].length;
 
     final List<MeasurementSchema> measurementSchemaArrayList =
         new ArrayList<>(Arrays.asList(measurementSchemaList).subList(0, 
columnSize));
 
-    final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList, 
rowSize);
+    final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList, 
rowCount);
     newTablet.timestamps = timestampColumn;
     newTablet.bitMaps = nullValueColumnBitmaps;
-    newTablet.values = squashFromColumnList(valueColumns, valueColumnTypes);
+    newTablet.values = valueColumns;
     newTablet.rowSize = rowCount;
 
     tablet = newTablet;
     return tablet;
   }
-
-  private Object[] squashFromColumnList(Object[][] valueColumns, TSDataType[] 
valueColumnTypes) {
-    final Object[] values = new Object[valueColumns.length];
-    for (int i = 0; i < valueColumns.length; i++) {
-      values[i] = squashFromColumn(valueColumns[i], valueColumnTypes[i]);
-    }
-    return values;
-  }
-
-  private Object squashFromColumn(Object[] valueColumn, TSDataType 
valueColumnType) {
-    switch (valueColumnType) {
-      case INT32:
-        final Integer[] intValues = (Integer[]) valueColumn;
-        final int[] intValues2 = new int[intValues.length];
-        for (int i = 0; i < intValues.length; i++) {
-          intValues2[i] = intValues[i] == null ? 0 : intValues[i];
-        }
-        return intValues2;
-      case INT64:
-        final Long[] longValues = (Long[]) valueColumn;
-        final long[] longValues2 = new long[longValues.length];
-        for (int i = 0; i < longValues.length; i++) {
-          longValues2[i] = longValues[i] == null ? 0 : longValues[i];
-        }
-        return longValues2;
-      case FLOAT:
-        final Float[] floatValues = (Float[]) valueColumn;
-        final float[] floatValues2 = new float[floatValues.length];
-        for (int i = 0; i < floatValues.length; i++) {
-          floatValues2[i] = floatValues[i] == null ? 0 : floatValues[i];
-        }
-        return floatValues2;
-      case DOUBLE:
-        final Double[] doubleValues = (Double[]) valueColumn;
-        final double[] doubleValues2 = new double[doubleValues.length];
-        for (int i = 0; i < doubleValues.length; i++) {
-          doubleValues2[i] = doubleValues[i] == null ? 0 : doubleValues[i];
-        }
-        return doubleValues2;
-      case BOOLEAN:
-        final Boolean[] booleanValues = (Boolean[]) valueColumn;
-        final boolean[] booleanValues2 = new boolean[booleanValues.length];
-        for (int i = 0; i < booleanValues.length; i++) {
-          booleanValues2[i] = booleanValues[i] != null && booleanValues[i];
-        }
-        return booleanValues2;
-      case TEXT:
-        final String[] stringValues = (String[]) valueColumn;
-        final Binary[] binaryValues = new Binary[stringValues.length];
-        for (int i = 0; i < stringValues.length; i++) {
-          binaryValues[i] = stringValues[i] == null ? null : 
Binary.valueOf(stringValues[i]);
-        }
-        return binaryValues;
-      default:
-        throw new UnSupportedDataTypeException(
-            String.format("Data type %s is not supported.", valueColumnType));
-    }
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
index 260814d609f..9035a8fe0bb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.datastructure;
 
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -124,7 +124,7 @@ public class TsFileInsertionDataContainer {
 
           @Override
           public TabletInsertionEvent next() {
-            return new PipeTabletInsertionEvent(tabletIterator.next());
+            return new PipeTabletTabletInsertionEvent(tabletIterator.next());
           }
         };
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
new file mode 100644
index 00000000000..5a211fc2cff
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class PipeInsertNodeTabletInsertionEventTest {
+
+  InsertRowNode insertRowNode;
+  InsertTabletNode insertTabletNode;
+
+  final String deviceId = "root.sg.d1";
+  final long[] times = new long[] {110L, 111L, 112L, 113L, 114L};
+  final String[] measurementIds = new String[] {"s1", "s2", "s3", "s4", "s5", 
"s6"};
+  final TSDataType[] dataTypes =
+      new TSDataType[] {
+        TSDataType.INT32,
+        TSDataType.INT64,
+        TSDataType.FLOAT,
+        TSDataType.DOUBLE,
+        TSDataType.BOOLEAN,
+        TSDataType.TEXT
+      };
+
+  final MeasurementSchema[] schemas = new MeasurementSchema[6];
+  final Object[] values = new Object[6];
+
+  final String pattern = "root.sg.d1";
+
+  Tablet tabletForInsertRowNode;
+  Tablet tabletForInsertTabletNode;
+
+  @Before
+  public void setUp() throws Exception {
+    createMeasurementSchema();
+    createInsertRowNode();
+    createInsertTabletNode();
+    createTablet();
+  }
+
+  private void createMeasurementSchema() {
+    for (int i = 0; i < 6; i++) {
+      schemas[i] = new MeasurementSchema(measurementIds[i], dataTypes[i]);
+    }
+  }
+
+  private void createInsertRowNode() throws IllegalPathException {
+
+    insertRowNode =
+        new InsertRowNode(
+            new PlanNodeId("plan node 1"),
+            new PartialPath(deviceId),
+            false,
+            measurementIds,
+            dataTypes,
+            schemas,
+            times[0],
+            values,
+            false);
+  }
+
+  private void createInsertTabletNode() throws IllegalPathException {
+    this.insertTabletNode =
+        new InsertTabletNode(
+            new PlanNodeId("plannode 1"),
+            new PartialPath(deviceId),
+            false,
+            measurementIds,
+            dataTypes,
+            schemas,
+            times,
+            null,
+            values,
+            times.length);
+  }
+
+  private void createTablet() {
+
+    // create tablet for insertRowNode
+    BitMap[] bitMapsForInsertRowNode = new BitMap[6];
+    for (int i = 0; i < 6; i++) {
+      bitMapsForInsertRowNode[i] = new BitMap(1);
+    }
+
+    values[0] = new int[1];
+    values[1] = new long[1];
+    values[2] = new float[1];
+    values[3] = new double[1];
+    values[4] = new boolean[1];
+    values[5] = new Binary[1];
+
+    for (int r = 0; r < 1; r++) {
+      ((int[]) values[0])[r] = 100;
+      ((long[]) values[1])[r] = 10000;
+      ((float[]) values[2])[r] = 2;
+      ((double[]) values[3])[r] = 1.0;
+      ((boolean[]) values[4])[r] = false;
+      ((Binary[]) values[5])[r] = Binary.valueOf("text");
+    }
+
+    tabletForInsertRowNode = new Tablet(deviceId, Arrays.asList(schemas), 1);
+    tabletForInsertRowNode.values = values;
+    tabletForInsertRowNode.timestamps = new long[] {times[0]};
+    tabletForInsertRowNode.rowSize = 1;
+    tabletForInsertRowNode.bitMaps = bitMapsForInsertRowNode;
+
+    // create tablet for insertTabletNode
+    BitMap[] bitMapsForInsertTabletNode = new BitMap[6];
+    for (int i = 0; i < 6; i++) {
+      bitMapsForInsertTabletNode[i] = new BitMap(times.length);
+    }
+
+    values[0] = new int[5];
+    values[1] = new long[5];
+    values[2] = new float[5];
+    values[3] = new double[5];
+    values[4] = new boolean[5];
+    values[5] = new Binary[5];
+
+    for (int r = 0; r < 5; r++) {
+      ((int[]) values[0])[r] = 100;
+      ((long[]) values[1])[r] = 10000;
+      ((float[]) values[2])[r] = 2;
+      ((double[]) values[3])[r] = 1.0;
+      ((boolean[]) values[4])[r] = false;
+      ((Binary[]) values[5])[r] = Binary.valueOf("text");
+    }
+    tabletForInsertTabletNode = new Tablet(deviceId, Arrays.asList(schemas), 
times.length);
+    tabletForInsertTabletNode.values = values;
+    tabletForInsertTabletNode.timestamps = times;
+    tabletForInsertTabletNode.rowSize = times.length;
+    tabletForInsertTabletNode.bitMaps = bitMapsForInsertTabletNode;
+  }
+
+  @Test
+  public void convertToTabletForTest() {
+    PipeInsertNodeTabletInsertionEvent event1 = new 
PipeInsertNodeTabletInsertionEvent(null, null);
+    Tablet tablet1 = event1.convertToTabletForTest(insertRowNode, pattern);
+    Assert.assertEquals(tablet1, tabletForInsertRowNode);
+
+    PipeInsertNodeTabletInsertionEvent event2 = new 
PipeInsertNodeTabletInsertionEvent(null, null);
+    Tablet tablet2 = event2.convertToTabletForTest(insertTabletNode, pattern);
+    Assert.assertEquals(tablet2, tabletForInsertTabletNode);
+  }
+}


Reply via email to