This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch rc/1.2.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c688e3dbdc6e3c76acce21d31834a20a91380f03 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Sep 27 09:57:29 2023 +0800 Revert "[IOTDB-6159] Pipe: Supported E-language payload in air gap connector/receiver (#11136) (#11167)" This reverts commit 74994a03dc7934de426fae9ef3aaba6ff68272ef. --- .../config/constant/PipeConnectorConstant.java | 4 -- .../payload/airgap/AirGapELanguageConstant.java | 34 ------------- .../payload/airgap/AirGapOneByteResponse.java | 4 -- .../protocol/airgap/IoTDBAirGapConnector.java | 21 +------- .../pipe/receiver/airgap/IoTDBAirGapReceiver.java | 58 +++++++--------------- 5 files changed, 18 insertions(+), 103 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java index 8135ae577ad..5f09012c00a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java @@ -44,10 +44,6 @@ public class PipeConnectorConstant { public static final String CONNECTOR_IOTDB_PASSWORD_KEY = "connector.password"; public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root"; - public static final String CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY = - "connector.air-gap.e-language.enable"; - public static final boolean CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE = false; - public static final String CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY = "connector.air-gap.handshake-timeout-ms"; public static final int CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE = 5000; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapELanguageConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapELanguageConstant.java deleted file mode 100644 index da7ccbffc27..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapELanguageConstant.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.connector.payload.airgap; - -import java.nio.charset.StandardCharsets; - -public class AirGapELanguageConstant { - public static final byte[] E_LANGUAGE_PREFIX = - ("<!System=IoTDB Version=1.0 Code=UTF-8 Data=1.0!>" + "\n" + "<block:=Free>" + "\n") - .getBytes(StandardCharsets.UTF_8); - public static final byte[] E_LANGUAGE_SUFFIX = - ("\n" + "</block:=Free>").getBytes(StandardCharsets.UTF_8); - - private AirGapELanguageConstant() { - // Utility class - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java index 8282ba479ec..b9b3bbaf086 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java @@ -23,8 +23,4 @@ public class AirGapOneByteResponse { public static final byte[] OK = new byte[] {0}; public static final byte[] FAIL = new byte[] {(byte) 0xFF}; - - private AirGapOneByteResponse() { - // Utility class - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java index add1c166687..e85222c8300 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.connector.protocol.airgap; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant; import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq; @@ -60,8 +59,6 @@ import java.util.List; import java.util.zip.CRC32; import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN; -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE; -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY; @@ -75,7 +72,6 @@ public class IoTDBAirGapConnector extends IoTDBConnector { private final List<Boolean> isSocketAlive = new ArrayList<>(); private int handshakeTimeoutMs; - private boolean eLanguageEnable; private long currentClientIndex = 0; @@ -102,12 +98,6 @@ public class IoTDBAirGapConnector extends IoTDBConnector { CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE); LOGGER.info( "IoTDBAirGapConnector is customized with handshakeTimeoutMs: {}.", handshakeTimeoutMs); - - eLanguageEnable = - parameters.getBooleanOrDefault( - CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY, - CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE); - LOGGER.info("IoTDBAirGapConnector is customized with eLanguageEnable: {}.", eLanguageEnable); } @Override @@ -364,8 +354,7 @@ public class IoTDBAirGapConnector extends IoTDBConnector { } final BufferedOutputStream outputStream = new BufferedOutputStream(socket.getOutputStream()); - bytes = enrichWithLengthAndChecksum(bytes); - outputStream.write(eLanguageEnable ? enrichWithELanguage(bytes) : bytes); + outputStream.write(enrichWithLengthAndChecksum(bytes)); outputStream.flush(); final byte[] response = new byte[1]; @@ -385,14 +374,6 @@ public class IoTDBAirGapConnector extends IoTDBConnector { Arrays.asList(length, length, BytesUtils.longToBytes(crc32.getValue()), bytes)); } - private byte[] enrichWithELanguage(byte[] bytes) { - return BytesUtils.concatByteArrayList( - Arrays.asList( - AirGapELanguageConstant.E_LANGUAGE_PREFIX, - bytes, - AirGapELanguageConstant.E_LANGUAGE_SUFFIX)); - } - @Override public void close() { for (int i = 0; i < sockets.size(); ++i) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java index 11dd9b29be5..dd7a7d83556 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.receiver.airgap; import org.apache.iotdb.commons.concurrent.WrappedRunnable; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant; import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse; import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest; import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent; @@ -47,7 +46,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.zip.CRC32; -import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.INT_LEN; import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN; public class IoTDBAirGapReceiver extends WrappedRunnable { @@ -61,8 +59,6 @@ public class IoTDBAirGapReceiver extends WrappedRunnable { private final IPartitionFetcher partitionFetcher; private final ISchemaFetcher schemaFetcher; - private boolean isELanguagePayload; - public IoTDBAirGapReceiver(Socket socket, long receiverId) { this.socket = socket; this.receiverId = receiverId; @@ -81,7 +77,6 @@ public class IoTDBAirGapReceiver extends WrappedRunnable { try { while (!socket.isClosed()) { - isELanguagePayload = false; receive(); } LOGGER.info( @@ -115,7 +110,6 @@ public class IoTDBAirGapReceiver extends WrappedRunnable { // Removed the used checksum final ByteBuffer byteBuffer = ByteBuffer.wrap(data, LONG_LEN, data.length - LONG_LEN); - // Pseudo request, to reuse logic in IoTDBThriftReceiverAgent final AirGapPseudoTPipeTransferRequest req = (AirGapPseudoTPipeTransferRequest) @@ -172,12 +166,17 @@ public class IoTDBAirGapReceiver extends WrappedRunnable { return new byte[0]; } - final byte[] resultBuffer = new byte[length]; - readTillFull(inputStream, resultBuffer); - if (isELanguagePayload) { - skipTillEnough(inputStream, AirGapELanguageConstant.E_LANGUAGE_SUFFIX.length); + final ByteBuffer resultBuffer = ByteBuffer.allocate(length); + final byte[] readBuffer = new byte[length]; + + int alreadyReadBytes = 0; + int currentReadBytes; + while (alreadyReadBytes < length) { + currentReadBytes = inputStream.read(readBuffer, 0, length - alreadyReadBytes); + resultBuffer.put(readBuffer, 0, currentReadBytes); + alreadyReadBytes += currentReadBytes; } - return resultBuffer; + return resultBuffer.array(); } /** @@ -185,40 +184,17 @@ public class IoTDBAirGapReceiver extends WrappedRunnable { * data to read. */ private int readLength(InputStream inputStream) throws IOException { - final byte[] doubleIntLengthBytes = new byte[2 * INT_LEN]; - readTillFull(inputStream, doubleIntLengthBytes); - - // Check the header of the request, if it is an E-Language request, skip the E-Language header. - // We assert AirGapELanguageConstant.E_LANGUAGE_PREFIX.length > 2 * INT_LEN here. - if (Arrays.equals( - doubleIntLengthBytes, - BytesUtils.subBytes(AirGapELanguageConstant.E_LANGUAGE_PREFIX, 0, 2 * INT_LEN))) { - isELanguagePayload = true; - skipTillEnough( - inputStream, (long) AirGapELanguageConstant.E_LANGUAGE_PREFIX.length - 2 * INT_LEN); - return readLength(inputStream); + byte[] lengthBytes0 = new byte[4]; + if (inputStream.read(lengthBytes0) < 4) { + return 0; } - final byte[] dataLengthBytes = BytesUtils.subBytes(doubleIntLengthBytes, 0, INT_LEN); // for double check - return Arrays.equals( - dataLengthBytes, BytesUtils.subBytes(doubleIntLengthBytes, INT_LEN, INT_LEN)) - ? BytesUtils.bytesToInt(dataLengthBytes) - : 0; - } - - private void readTillFull(InputStream inputStream, byte[] readBuffer) throws IOException { - int alreadyReadBytes = 0; - while (alreadyReadBytes < readBuffer.length) { - alreadyReadBytes += - inputStream.read(readBuffer, alreadyReadBytes, readBuffer.length - alreadyReadBytes); + byte[] lengthBytes1 = new byte[4]; + if (inputStream.read(lengthBytes1) < 4) { + return 0; } - } - private void skipTillEnough(InputStream inputStream, long length) throws IOException { - int currentSkippedBytes = 0; - while (currentSkippedBytes < length) { - currentSkippedBytes += inputStream.skip(length - currentSkippedBytes); - } + return Arrays.equals(lengthBytes0, lengthBytes1) ? BytesUtils.bytesToInt(lengthBytes0) : 0; } }
