This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.2.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c688e3dbdc6e3c76acce21d31834a20a91380f03
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Sep 27 09:57:29 2023 +0800

    Revert "[IOTDB-6159] Pipe: Supported E-language payload in air gap 
connector/receiver (#11136) (#11167)"
    
    This reverts commit 74994a03dc7934de426fae9ef3aaba6ff68272ef.
---
 .../config/constant/PipeConnectorConstant.java     |  4 --
 .../payload/airgap/AirGapELanguageConstant.java    | 34 -------------
 .../payload/airgap/AirGapOneByteResponse.java      |  4 --
 .../protocol/airgap/IoTDBAirGapConnector.java      | 21 +-------
 .../pipe/receiver/airgap/IoTDBAirGapReceiver.java  | 58 +++++++---------------
 5 files changed, 18 insertions(+), 103 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 8135ae577ad..5f09012c00a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -44,10 +44,6 @@ public class PipeConnectorConstant {
   public static final String CONNECTOR_IOTDB_PASSWORD_KEY = 
"connector.password";
   public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
 
-  public static final String CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY =
-      "connector.air-gap.e-language.enable";
-  public static final boolean 
CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE = false;
-
   public static final String CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY =
       "connector.air-gap.handshake-timeout-ms";
   public static final int CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE 
= 5000;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapELanguageConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapELanguageConstant.java
deleted file mode 100644
index da7ccbffc27..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapELanguageConstant.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.connector.payload.airgap;
-
-import java.nio.charset.StandardCharsets;
-
-public class AirGapELanguageConstant {
-  public static final byte[] E_LANGUAGE_PREFIX =
-      ("<!System=IoTDB Version=1.0 Code=UTF-8 Data=1.0!>" + "\n" + 
"<block:=Free>" + "\n")
-          .getBytes(StandardCharsets.UTF_8);
-  public static final byte[] E_LANGUAGE_SUFFIX =
-      ("\n" + "</block:=Free>").getBytes(StandardCharsets.UTF_8);
-
-  private AirGapELanguageConstant() {
-    // Utility class
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java
index 8282ba479ec..b9b3bbaf086 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java
@@ -23,8 +23,4 @@ public class AirGapOneByteResponse {
 
   public static final byte[] OK = new byte[] {0};
   public static final byte[] FAIL = new byte[] {(byte) 0xFF};
-
-  private AirGapOneByteResponse() {
-    // Utility class
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index add1c166687..e85222c8300 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.connector.protocol.airgap;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant;
 import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
@@ -60,8 +59,6 @@ import java.util.List;
 import java.util.zip.CRC32;
 
 import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
 
@@ -75,7 +72,6 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
   private final List<Boolean> isSocketAlive = new ArrayList<>();
 
   private int handshakeTimeoutMs;
-  private boolean eLanguageEnable;
 
   private long currentClientIndex = 0;
 
@@ -102,12 +98,6 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
             CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE);
     LOGGER.info(
         "IoTDBAirGapConnector is customized with handshakeTimeoutMs: {}.", 
handshakeTimeoutMs);
-
-    eLanguageEnable =
-        parameters.getBooleanOrDefault(
-            CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY,
-            CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE);
-    LOGGER.info("IoTDBAirGapConnector is customized with eLanguageEnable: 
{}.", eLanguageEnable);
   }
 
   @Override
@@ -364,8 +354,7 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
     }
 
     final BufferedOutputStream outputStream = new 
BufferedOutputStream(socket.getOutputStream());
-    bytes = enrichWithLengthAndChecksum(bytes);
-    outputStream.write(eLanguageEnable ? enrichWithELanguage(bytes) : bytes);
+    outputStream.write(enrichWithLengthAndChecksum(bytes));
     outputStream.flush();
 
     final byte[] response = new byte[1];
@@ -385,14 +374,6 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
         Arrays.asList(length, length, 
BytesUtils.longToBytes(crc32.getValue()), bytes));
   }
 
-  private byte[] enrichWithELanguage(byte[] bytes) {
-    return BytesUtils.concatByteArrayList(
-        Arrays.asList(
-            AirGapELanguageConstant.E_LANGUAGE_PREFIX,
-            bytes,
-            AirGapELanguageConstant.E_LANGUAGE_SUFFIX));
-  }
-
   @Override
   public void close() {
     for (int i = 0; i < sockets.size(); ++i) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
index 11dd9b29be5..dd7a7d83556 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.receiver.airgap;
 import org.apache.iotdb.commons.concurrent.WrappedRunnable;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant;
 import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
 import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent;
@@ -47,7 +46,6 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.zip.CRC32;
 
-import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.INT_LEN;
 import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
 
 public class IoTDBAirGapReceiver extends WrappedRunnable {
@@ -61,8 +59,6 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
   private final IPartitionFetcher partitionFetcher;
   private final ISchemaFetcher schemaFetcher;
 
-  private boolean isELanguagePayload;
-
   public IoTDBAirGapReceiver(Socket socket, long receiverId) {
     this.socket = socket;
     this.receiverId = receiverId;
@@ -81,7 +77,6 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
 
     try {
       while (!socket.isClosed()) {
-        isELanguagePayload = false;
         receive();
       }
       LOGGER.info(
@@ -115,7 +110,6 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
 
       // Removed the used checksum
       final ByteBuffer byteBuffer = ByteBuffer.wrap(data, LONG_LEN, 
data.length - LONG_LEN);
-
       // Pseudo request, to reuse logic in IoTDBThriftReceiverAgent
       final AirGapPseudoTPipeTransferRequest req =
           (AirGapPseudoTPipeTransferRequest)
@@ -172,12 +166,17 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
       return new byte[0];
     }
 
-    final byte[] resultBuffer = new byte[length];
-    readTillFull(inputStream, resultBuffer);
-    if (isELanguagePayload) {
-      skipTillEnough(inputStream, 
AirGapELanguageConstant.E_LANGUAGE_SUFFIX.length);
+    final ByteBuffer resultBuffer = ByteBuffer.allocate(length);
+    final byte[] readBuffer = new byte[length];
+
+    int alreadyReadBytes = 0;
+    int currentReadBytes;
+    while (alreadyReadBytes < length) {
+      currentReadBytes = inputStream.read(readBuffer, 0, length - 
alreadyReadBytes);
+      resultBuffer.put(readBuffer, 0, currentReadBytes);
+      alreadyReadBytes += currentReadBytes;
     }
-    return resultBuffer;
+    return resultBuffer.array();
   }
 
   /**
@@ -185,40 +184,17 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
    * data to read.
    */
   private int readLength(InputStream inputStream) throws IOException {
-    final byte[] doubleIntLengthBytes = new byte[2 * INT_LEN];
-    readTillFull(inputStream, doubleIntLengthBytes);
-
-    // Check the header of the request, if it is an E-Language request, skip 
the E-Language header.
-    // We assert AirGapELanguageConstant.E_LANGUAGE_PREFIX.length > 2 * 
INT_LEN here.
-    if (Arrays.equals(
-        doubleIntLengthBytes,
-        BytesUtils.subBytes(AirGapELanguageConstant.E_LANGUAGE_PREFIX, 0, 2 * 
INT_LEN))) {
-      isELanguagePayload = true;
-      skipTillEnough(
-          inputStream, (long) AirGapELanguageConstant.E_LANGUAGE_PREFIX.length 
- 2 * INT_LEN);
-      return readLength(inputStream);
+    byte[] lengthBytes0 = new byte[4];
+    if (inputStream.read(lengthBytes0) < 4) {
+      return 0;
     }
 
-    final byte[] dataLengthBytes = BytesUtils.subBytes(doubleIntLengthBytes, 
0, INT_LEN);
     // for double check
-    return Arrays.equals(
-            dataLengthBytes, BytesUtils.subBytes(doubleIntLengthBytes, 
INT_LEN, INT_LEN))
-        ? BytesUtils.bytesToInt(dataLengthBytes)
-        : 0;
-  }
-
-  private void readTillFull(InputStream inputStream, byte[] readBuffer) throws 
IOException {
-    int alreadyReadBytes = 0;
-    while (alreadyReadBytes < readBuffer.length) {
-      alreadyReadBytes +=
-          inputStream.read(readBuffer, alreadyReadBytes, readBuffer.length - 
alreadyReadBytes);
+    byte[] lengthBytes1 = new byte[4];
+    if (inputStream.read(lengthBytes1) < 4) {
+      return 0;
     }
-  }
 
-  private void skipTillEnough(InputStream inputStream, long length) throws 
IOException {
-    int currentSkippedBytes = 0;
-    while (currentSkippedBytes < length) {
-      currentSkippedBytes += inputStream.skip(length - currentSkippedBytes);
-    }
+    return Arrays.equals(lengthBytes0, lengthBytes1) ? 
BytesUtils.bytesToInt(lengthBytes0) : 0;
   }
 }

Reply via email to